Author: robert
Date: 2008-03-14 19:41:34 +0000 (Fri, 14 Mar 2008)
New Revision: 18536
Modified:
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
Log:
grab transmit window space in FIFO order (possibly prevent starvation)
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-03-14
18:13:46 UTC (rev 18535)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-03-14
19:41:34 UTC (rev 18536)
@@ -44,12 +44,17 @@
private boolean slowStart = true;
/** Total packets in flight, including waiting for bandwidth from the
central throttle. */
private int _packetsInFlight;
- /** Incremented on each send */
+ /** Incremented on each send; the sequence number of the packet last
added to the window/sent */
private long _packetSeq;
/** Last time (seqno) the window was full */
private long _packetSeqWindowFull;
/** Last time (seqno) we checked whether the window was full, or
dropped a packet. */
private long _packetSeqWindowFullChecked;
+ /** Holds the next number to be used for fifo packet pre-sequence
numbers */
+ private long _packetTicketGenerator;
+ /** The number of would-be packets which are no longer waiting in line
for the transmition window */
+ private long _abandonedTickets;
+
private static boolean logMINOR;
private PacketThrottle _deprecatedFor;
@@ -144,9 +149,12 @@
long bootID = peer.getBootID();
synchronized(this) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ long thisTicket=_packetTicketGenerator++;
while(true) {
int windowSize = (int) getWindowSize();
- if(_packetsInFlight < windowSize) {
+ boolean
wereNext=(_packetSeq==(thisTicket-_abandonedTickets));
+ //If there is room for it in the window, break
and send it immeadiately
+ if(_packetsInFlight < windowSize && wereNext) {
_packetsInFlight++;
_packetSeq++;
if(windowSize == _packetsInFlight) {
@@ -156,20 +164,24 @@
if(logMINOR) Logger.minor(this,
"Sending, window size now "+windowSize+" packets in flight "+_packetsInFlight+"
for "+this);
break;
}
- if(logMINOR) Logger.minor(this, "Window size:
"+windowSize+" packets in flight "+_packetsInFlight+" for "+this);
+ long
waitingBehind=thisTicket-_abandonedTickets-_packetSeq;
+ if(logMINOR) Logger.minor(this, "Window size:
"+windowSize+" packets in flight "+_packetsInFlight+", "+waitingBehind+" in
front of this thread for "+this);
long now = System.currentTimeMillis();
int waitFor = (int)Math.min(Integer.MAX_VALUE,
deadline - now);
if(waitFor <= 0) {
// Double-check.
if(!peer.isConnected()) {
Logger.error(this, "Not
notified of disconnection before timeout");
+ _abandonedTickets++;
throw new
NotConnectedException();
}
if(bootID != peer.getBootID()) {
Logger.error(this, "Not
notified of reconnection before timeout");
+ _abandonedTickets++;
throw new
NotConnectedException();
}
Logger.error(this, "Unable to send
throttled message, waited "+(now-start)+"ms");
+ _abandonedTickets++;
throw new WaitedTooLongException();
}
try {
@@ -177,9 +189,16 @@
} catch (InterruptedException e) {
// Ignore
}
- if(!peer.isConnected()) throw new
NotConnectedException();
- if(bootID != peer.getBootID()) throw new
NotConnectedException();
+ if(!peer.isConnected()) {
+ _abandonedTickets++;
+ throw new NotConnectedException();
+ }
+ if(bootID != peer.getBootID()) {
+ _abandonedTickets++;
+ throw new NotConnectedException();
+ }
if(_deprecatedFor != null) {
+ _abandonedTickets++;
throw new
ThrottleDeprecatedException(_deprecatedFor);
}
}