Author: robert
Date: 2008-03-14 19:41:34 +0000 (Fri, 14 Mar 2008)
New Revision: 18536

Modified:
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
Log:
grab transmit window space in FIFO order (possibly prevent starvation)


Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-03-14 
18:13:46 UTC (rev 18535)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-03-14 
19:41:34 UTC (rev 18536)
@@ -44,12 +44,17 @@
        private boolean slowStart = true;
        /** Total packets in flight, including waiting for bandwidth from the 
central throttle. */
        private int _packetsInFlight;
-       /** Incremented on each send */
+       /** Incremented on each send; the sequence number of the packet last 
added to the window/sent */
        private long _packetSeq;
        /** Last time (seqno) the window was full */
        private long _packetSeqWindowFull;
        /** Last time (seqno) we checked whether the window was full, or 
dropped a packet. */
        private long _packetSeqWindowFullChecked;
+       /** Holds the next number to be used for fifo packet pre-sequence 
numbers */
+       private long _packetTicketGenerator;
+       /** The number of would-be packets which are no longer waiting in line 
for the transmition window */
+       private long _abandonedTickets;
+       
        private static boolean logMINOR;
        private PacketThrottle _deprecatedFor;

@@ -144,9 +149,12 @@
                long bootID = peer.getBootID();
                synchronized(this) {
                        logMINOR = Logger.shouldLog(Logger.MINOR, this);
+                       long thisTicket=_packetTicketGenerator++;
                        while(true) {
                                int windowSize = (int) getWindowSize();
-                               if(_packetsInFlight < windowSize) {
+                               boolean 
wereNext=(_packetSeq==(thisTicket-_abandonedTickets));
+                               //If there is room for it in the window, break 
and send it immeadiately
+                               if(_packetsInFlight < windowSize && wereNext) {
                                        _packetsInFlight++;
                                        _packetSeq++;
                                        if(windowSize == _packetsInFlight) {
@@ -156,20 +164,24 @@
                                        if(logMINOR) Logger.minor(this, 
"Sending, window size now "+windowSize+" packets in flight "+_packetsInFlight+" 
for "+this);
                                        break;
                                }
-                               if(logMINOR) Logger.minor(this, "Window size: 
"+windowSize+" packets in flight "+_packetsInFlight+" for "+this);
+                               long 
waitingBehind=thisTicket-_abandonedTickets-_packetSeq;
+                               if(logMINOR) Logger.minor(this, "Window size: 
"+windowSize+" packets in flight "+_packetsInFlight+", "+waitingBehind+" in 
front of this thread for "+this);
                                long now = System.currentTimeMillis();
                                int waitFor = (int)Math.min(Integer.MAX_VALUE, 
deadline - now);
                                if(waitFor <= 0) {
                                        // Double-check.
                                        if(!peer.isConnected()) {
                                                Logger.error(this, "Not 
notified of disconnection before timeout");
+                                               _abandonedTickets++;
                                                throw new 
NotConnectedException();
                                        }
                                        if(bootID != peer.getBootID()) {
                                                Logger.error(this, "Not 
notified of reconnection before timeout");
+                                               _abandonedTickets++;
                                                throw new 
NotConnectedException();
                                        }
                                        Logger.error(this, "Unable to send 
throttled message, waited "+(now-start)+"ms");
+                                       _abandonedTickets++;
                                        throw new WaitedTooLongException();
                                }
                                try {
@@ -177,9 +189,16 @@
                                } catch (InterruptedException e) {
                                        // Ignore
                                }
-                               if(!peer.isConnected()) throw new 
NotConnectedException();
-                               if(bootID != peer.getBootID()) throw new 
NotConnectedException();
+                               if(!peer.isConnected()) {
+                                       _abandonedTickets++;
+                                       throw new NotConnectedException();
+                               }
+                               if(bootID != peer.getBootID()) {
+                                       _abandonedTickets++;
+                                       throw new NotConnectedException();
+                               }
                                if(_deprecatedFor != null) {
+                                       _abandonedTickets++;
                                        throw new 
ThrottleDeprecatedException(_deprecatedFor);
                                }
                        }


Reply via email to