What's going on with the sendAck(int) -> send(int)?

On Mon, Feb 05, 2007 at 11:52:16AM +0000, mrogers at freenetproject.org wrote:
> Author: mrogers
> Date: 2007-02-05 11:52:15 +0000 (Mon, 05 Feb 2007)
> New Revision: 11679
> 
> Modified:
>    trunk/apps/load-balancing-sims/phase7/sim/Node.java
>    trunk/apps/load-balancing-sims/phase7/sim/Packet.java
>    trunk/apps/load-balancing-sims/phase7/sim/Peer.java
>    trunk/apps/load-balancing-sims/phase7/sim/Sim.java
>    trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java
>    trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java
>    trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java
>    trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java
>    trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java
> Log:
> Disabled ack coalescing in preparation for improved RTO/FRTO calculation
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Node.java       2007-02-05 
> 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java       2007-02-05 
> 11:52:15 UTC (rev 11679)
> @@ -1,5 +1,5 @@
>  package sim;
> -import sim.generators.Client;
> +import sim.clients.Client;
>  import sim.handlers.*;
>  import sim.messages.*;
>  import java.util.HashMap;
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Packet.java     2007-02-05 
> 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java     2007-02-05 
> 11:52:15 UTC (rev 11679)
> @@ -1,40 +1,32 @@
>  // A low-level packet (as opposed to a high-level message)
>  
>  package sim;
> -import sim.messages.Ack;
>  import sim.messages.Message;
>  import java.util.ArrayList;
>  
>  class Packet
>  {
> -     public final static int HEADER_SIZE = 70; // Including IP & UDP headers
> -     public final static int ACK_SIZE = 4; // Size of an ack in bytes
> +     public final static int HEADER_SIZE = 60; // Including IP & UDP headers
>       public final static int MAX_SIZE = 1450; // MTU including headers
>       public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
>       
>       public final int src, dest; // Network addresses
>       public int size = HEADER_SIZE; // Size in bytes, including headers
>       public int seq = -1; // Data sequence number (-1 if no data)
> -     public ArrayList<Ack> acks = null;
> +     public int ack = -1; // Ack sequence number (-1 if no ack)
>       public ArrayList<Message> messages = null;
>       
>       public double sent; // Time at which the packet was (re) transmitted
>       public double latency; // Link latency, stored here for convenience
>       
> -     public Packet (int src, int dest, double latency)
> +     public Packet (int src, int dest, double latency, int ack)
>       {
>               this.src = src;
>               this.dest = dest;
>               this.latency = latency;
> +             this.ack = ack;
>       }
>       
> -     public void addAck (Ack a)
> -     {
> -             if (acks == null) acks = new ArrayList<Ack>();
> -             acks.add (a);
> -             size += a.size();
> -     }
> -     
>       public void addMessage (Message m)
>       {
>               if (messages == null) messages = new ArrayList<Message>();
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java       2007-02-05 
> 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java       2007-02-05 
> 11:52:15 UTC (rev 11679)
> @@ -36,7 +36,6 @@
>       private int txSeq = 0; // Sequence number of next outgoing data packet
>       private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
>       private LinkedList<Packet> txBuffer; // Retransmission buffer
> -     private DeadlineQueue<Ack> ackQueue; // Outgoing acks
>       private DeadlineQueue<Message> searchQueue; // Outgoing search messages
>       private DeadlineQueue<Message> transferQueue; // Outgoing transfers
>       private CongestionWindow window; // AIMD congestion window
> @@ -61,7 +60,6 @@
>               this.location = location;
>               this.latency = latency;
>               txBuffer = new LinkedList<Packet>();
> -             ackQueue = new DeadlineQueue<Ack>();
>               searchQueue = new DeadlineQueue<Message>();
>               transferQueue = new DeadlineQueue<Message>();
>               window = new CongestionWindow (this);
> @@ -83,20 +81,9 @@
>               // Start the coalescing timer
>               startTimer();
>               // Send as many packets as possible
> -             while (send());
> +             while (send (-1));
>       }
>       
> -     // Queue an ack for transmission
> -     private void sendAck (int seq)
> -     {
> -             if (LOG) log ("ack " + seq + " added to ack queue");
> -             ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
> -             // Start the coalescing timer
> -             startTimer();
> -             // Send as many packets as possible
> -             while (send());
> -     }
> -     
>       // Start the coalescing timer
>       private void startTimer()
>       {
> @@ -107,11 +94,11 @@
>       }
>       
>       // Try to send a packet, return true if a packet was sent
> -     private boolean send()
> +     private boolean send (int ack)
>       {
> -             int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
> +             int waiting = searchQueue.size + transferQueue.size;
>               if (LOG) log (waiting + " bytes waiting");
> -             if (waiting == 0) return false;
> +             if (ack == -1 && waiting == 0) return false;
>               
>               // Return to slow start when the link is idle
>               double now = Event.time();
> @@ -123,29 +110,30 @@
>               size = Math.min (size, node.bandwidth.available());
>               if (LOG) log (size + " bytes available for packet");
>               
> -             // Urgent acks to send?
> -             if (ackQueue.deadline() <= now) return sendPacket (size);
> +             // Ack to send?
> +             if (ack != -1) return sendPacket (ack, size);
>               // Urgent searches and room to send them?
>               if (searchQueue.deadline() <= now
> -             && searchQueue.headSize() <= size) return sendPacket (size);
> +             && searchQueue.headSize() <= size)
> +                     return sendPacket (ack, size);
>               // Urgent transfers and room to send them?
>               if (transferQueue.deadline() <= now
> -             && transferQueue.headSize() <= size) return sendPacket (size);
> +             && transferQueue.headSize() <= size)
> +                     return sendPacket (ack, size);
>               // Enough non-urgent messages for a large packet, and room?
>               if (waiting >= Packet.SENSIBLE_PAYLOAD
> -             && size >= Packet.SENSIBLE_PAYLOAD) return sendPacket (size);
> +             && size >= Packet.SENSIBLE_PAYLOAD)
> +                     return sendPacket (ack, size);
>               
>               if (LOG) log ("not sending a packet");
>               return false;
>       }
>       
>       // Try to send a packet up to the specified size, return true if sent
> -     private boolean sendPacket (int maxSize)
> +     private boolean sendPacket (int ack, int maxSize)
>       {
>               // Construct a packet
> -             Packet p = new Packet (node.net.address, address, latency);
> -             // Add all waiting acks to the packet
> -             while (ackQueue.size > 0) p.addAck (ackQueue.pop());
> +             Packet p = new Packet (node.net.address, address, latency, ack);
>               if (LOG) log ((maxSize - p.size) + " bytes for messages");
>               // Don't allow more than SEQ_RANGE payloads to be in flight
>               if (txSeq <= txMaxSeq) {
> @@ -169,7 +157,7 @@
>                       log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
>               }
>               // Don't send empty packets
> -             if (p.acks == null && p.messages == null) return false;
> +             if (p.ack == -1 && p.messages == null) return false;
>               // Transmit the packet
>               if (LOG) log ("sending packet " +p.seq+ ", " +p.size+ " bytes");
>               node.sendPacket (p);
> @@ -186,7 +174,7 @@
>       // Called by Node when a packet arrives
>       public void handlePacket (Packet p)
>       {
> -             if (p.acks != null) for (Ack a : p.acks) handleAck (a);
> +             if (p.ack != -1) handleAck (p.ack);
>               if (p.messages != null) handleData (p);
>       }
>       
> @@ -195,7 +183,7 @@
>               if (LOG) log ("received packet " +p.seq+ ", expected " +rxSeq);
>               if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
>                       if (LOG) log ("duplicate packet");
> -                     sendAck (p.seq); // Original ack may have been lost
> +                     send (p.seq); // Original ack may have been lost
>               }
>               else if (p.seq == rxSeq) {
>                       // Find the sequence number of the next missing packet
> @@ -204,7 +192,7 @@
>                       // Deliver the messages to the node
>                       for (Message m : p.messages)
>                               node.handleMessage (m, this);
> -                     sendAck (p.seq);
> +                     send (p.seq);
>               }
>               else if (p.seq < rxSeq + SEQ_RANGE) {
>                       if (LOG) log ("packet out of order");
> @@ -212,37 +200,36 @@
>                       // Deliver the messages to the node
>                       for (Message m : p.messages)
>                               node.handleMessage (m, this);
> -                     sendAck (p.seq);
> +                     send (p.seq);
>               }
>               // This indicates a misbehaving sender - discard the packet
>               else if (LOG) log ("WARNING: sequence number out of range");
>       }
>       
> -     private void handleAck (Ack a)
> +     private void handleAck (int ack)
>       {
> -             int seq = a.id;
> -             if (LOG) log ("received ack " + seq);
> +             if (LOG) log ("received ack " + ack);
>               double now = Event.time();
>               Iterator<Packet> i = txBuffer.iterator();
>               while (i.hasNext()) {
>                       Packet p = i.next();
>                       double age = now - p.sent;
>                       // Explicit ack
> -                     if (p.seq == seq) {
> +                     if (p.seq == ack) {
>                               i.remove();
>                               // Update the congestion window
>                               window.bytesAcked (p.size);
>                               // Update the average round-trip time
>                               rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
>                               if (LOG) {
> -                                     log ("packet " +p.seq+ " acknowledged");
> +                                     log ("packet " + ack + " acknowledged");
>                                       log ("round-trip time " + age);
>                                       log ("average round-trip time " + rtt);
>                               }
>                               break;
>                       }
>                       // Fast retransmission
> -                     if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
> +                     if (p.seq < ack && age > FRTO * rtt) {
>                               p.sent = now;
>                               if (LOG) log ("fast retransmitting " + p.seq);
>                               node.resendPacket (p);
> @@ -253,8 +240,8 @@
>               if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
>               else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
>               if (LOG) log ("maximum sequence number " + txMaxSeq);
> -             // Send as many packets a possible
> -             if (timerRunning) while (send());
> +             // Send as many packets as possible
> +             if (timerRunning) while (send (-1));
>               else checkDeadlines();
>       }
>       
> @@ -326,7 +313,7 @@
>               
>               double now = Event.time();
>               for (Packet p : txBuffer) {
> -                     if (now - p.sent > RTO * rtt + MAX_DELAY) {
> +                     if (now - p.sent > RTO * rtt) {
>                               // Retransmission timeout
>                               if (LOG) log ("retransmitting " + p.seq);
>                               p.sent = now;
> @@ -341,11 +328,11 @@
>       private void checkDeadlines()
>       {
>               // Send as many packets as possible
> -             while (send());
> +             while (send (-1));
>               // Find the next coalescing deadline - ignore message deadlines
>               // if there isn't room in the congestion window to send them
> -             double dl = ackQueue.deadline();
> -             int win = window.available() -Packet.HEADER_SIZE -ackQueue.size;
> +             double dl = Double.POSITIVE_INFINITY;
> +             int win = window.available() - Packet.HEADER_SIZE;
>               if (searchQueue.headSize() <= win)
>                       dl = Math.min (dl, searchQueue.deadline());
>               if (transferQueue.headSize() <= win)
> @@ -370,12 +357,9 @@
>       // Are we waiting for the bandwidth limiter?
>       private boolean shouldPoll()
>       {
> -             double now = Event.time();
> -             // Will we need to send an ack before the next polling interval?
> -             if (ackQueue.deadline() < now + node.bandwidth.poll)
> -                     return false;
>               double bw = node.bandwidth.available();
>               double win = window.available();
> +             double now = Event.time();
>               // Is there an overdue search that's waiting for bandwidth?
>               if (searchQueue.headSize() > bw
>               && searchQueue.headSize() <= win
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Sim.java        2007-02-05 
> 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java        2007-02-05 
> 11:52:15 UTC (rev 11679)
> @@ -1,5 +1,5 @@
>  package sim;
> -import sim.generators.SimplePublisher;
> +import sim.clients.SimplePublisher;
>  
>  class Sim implements EventTarget
>  {
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java     
> 2007-02-05 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java     
> 2007-02-05 11:52:15 UTC (rev 11679)
> @@ -1,4 +1,4 @@
> -package sim.generators;
> +package sim.clients;
>  import sim.messages.Search;
>  
>  public interface Client
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java    
> 2007-02-05 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java    
> 2007-02-05 11:52:15 UTC (rev 11679)
> @@ -1,7 +1,7 @@
>  // A simple publisher that inserts keys using a Poisson process and informs
>  // each reader after an average of ten minutes
>  
> -package sim.generators;
> +package sim.clients;
>  import sim.Event;
>  import sim.EventTarget;
>  import sim.Node;
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java 
> 2007-02-05 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java 
> 2007-02-05 11:52:15 UTC (rev 11679)
> @@ -1,5 +1,5 @@
>  package sim.messages;
> -import sim.generators.Client;
> +import sim.clients.Client;
>  
>  public class ChkInsert extends Search
>  {
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java    
> 2007-02-05 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java    
> 2007-02-05 11:52:15 UTC (rev 11679)
> @@ -1,5 +1,5 @@
>  package sim.messages;
> -import sim.generators.Client;
> +import sim.clients.Client;
>  
>  public class Search extends Message
>  {
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java 
> 2007-02-05 00:36:43 UTC (rev 11678)
> +++ trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java 
> 2007-02-05 11:52:15 UTC (rev 11679)
> @@ -1,5 +1,5 @@
>  package sim.messages;
> -import sim.generators.Client;
> +import sim.clients.Client;
>  
>  public class SskInsert extends Search
>  {
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20070205/014645c0/attachment.pgp>

Reply via email to