Author: toad
Date: 2006-07-07 20:20:09 +0000 (Fri, 07 Jul 2006)
New Revision: 9497
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/io/comm/LowLevelFilter.java
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/LocationManager.java
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/BitArray.java
trunk/freenet/src/freenet/support/DoubleTokenBucket.java
trunk/freenet/src/freenet/support/TokenBucket.java
trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
Log:
862: New bandwidth limiting algorithm, and related changes. Not fully tested
yet.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2006-07-07 19:29:24 UTC (rev
9496)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2006-07-07 20:20:09 UTC (rev
9497)
@@ -172,6 +172,11 @@
return msg;
}
+ public static int packetTransmitSize(int size, int _packets) {
+ return size + 8 /* uid */ + 4 /* packet# */ +
+ BitArray.serializedLength(_packets) + 4 /* Message
header */;
+ }
+
public static final MessageType allSent = new MessageType("allSent") {{
addField(UID, Long.class);
}};
Modified: trunk/freenet/src/freenet/io/comm/LowLevelFilter.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/LowLevelFilter.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/comm/LowLevelFilter.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -21,20 +21,9 @@
*/
void process(byte[] buf, int offset, int length, Peer peer);
+ // Outgoing packets are handled elsewhere...
+
/**
- * Process an outgoing packet. Takes a byte[], which is an
- * encoded message. Then does whatever encryption or other
- * things need doing, and calls UdpSocketManager.sendPacket(...)
- * to send the processed data.
- * @param buf The buffer to read from.
- * @param offset The offset to start reading from.
- * @param length The length in bytes to read.
- * @param peer The PeerContext the messages will be sent to.
- * @throws PacketSequenceException
- */
- void processOutgoing(byte[] buf, int offset, int length, PeerContext peer)
throws NotConnectedException, LowLevelFilterException;
-
- /**
* Is the given connection closed?
*/
boolean isDisconnected(PeerContext context);
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -503,7 +503,7 @@
// } else {
// sendPacket(blockToSend, destination.getPeer());
// }
- ((PeerNode)destination).sendAsync(m, null);
+ ((PeerNode)destination).sendAsync(m, null, 0);
}
/**
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -29,8 +29,10 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.UdpSocketManager;
+import freenet.node.FNPPacketMangler;
import freenet.node.PeerNode;
import freenet.support.BitArray;
+import freenet.support.DoubleTokenBucket;
import freenet.support.Logger;
/**
@@ -52,116 +54,17 @@
boolean failedByOverload = false;
final PacketThrottle throttle;
long timeAllSent = -1;
+ final DoubleTokenBucket _masterThrottle;
+ final int PACKET_SIZE;
- // FIXME make this stuff non-static. Have a context object for limiting.
-
- // Static stuff for global bandwidth limiter
- /** Synchronization object for bandwidth limiting */
- static final Object lastPacketSendTimeSync = new Object();
- // Use nanosecond long values for accuracy reasons
- /** Time at which the last known packet is scheduled to be sent.
- * We will not send another packet until at least minPacketDelay ms
after this time. */
- static long hardLastPacketSendTimeNSec = System.currentTimeMillis() *
1000*1000;
- /** Minimum interval between packet sends, for overall hard bandwidth
limiter */
- static int minPacketDelayNSec = 0;
- /** Minimum average interval between packet sends, for averaged (soft)
overall
- * bandwidth usage limiter. */
- static int minSoftDelayNSec = 0;
- /** "Soft" equivalent to hardLastPacketSendTime. Can lag up to half the
softLimitPeriod
- * behind the current time. Otherwise is similar. This gives it
flexibility; we can have
- * spurts above the average limit, but over the softLimitPeriod, it
will average out to
- * the target minSoftDelay. */
- static long softLastPacketSendTimeNSec = System.currentTimeMillis() *
1000*1000;
- /** Period over which the soft limiter should work */
- static long softLimitPeriodNSec;
-
- /**
- * Set the hard bandwidth limiter.
- * @param bytesPerSecond The maximum number of bytes (of data blocks)
to be sent in any
- * one second.
- */
- public static void setHardBandwidthLimit(int bytesPerSecond) {
- int newMinPacketDelayNS =
convertBytesPerSecondToNanosPerPacket(bytesPerSecond);
- synchronized(lastPacketSendTimeSync) {
- if(minPacketDelayNSec != newMinPacketDelayNS) {
- minPacketDelayNSec = newMinPacketDelayNS;
- hardLastPacketSendTimeNSec =
System.currentTimeMillis() * 1000*1000;
- }
- }
- }
-
- public static int convertBytesPerSecondToNanosPerPacket(int
bytesPerSecond) {
- if(bytesPerSecond <= 0)
- return 0; // no limits
-
- int packetSize = getPacketSize();
- double minNanoSecondsBetweenPackets =
- ((1000.0*1000.0*1000.0) * packetSize) / ((double)
bytesPerSecond);
- int newMinPacketDelayNS = (int) minNanoSecondsBetweenPackets;
- double inaccuracy = minNanoSecondsBetweenPackets -
newMinPacketDelayNS;
- double inaccuracyPercent = (inaccuracy /
minNanoSecondsBetweenPackets) * 100.0;
- Logger.minor(BlockTransmitter.class, "Quantization inaccuracy:
"+inaccuracyPercent+"%");
- return newMinPacketDelayNS;
- }
-
- public static int convertBytesPerPeriodToNanosPerPacket(int
bytesPerSecond, long periodLengthNanos) {
- if(bytesPerSecond <= 0)
- return 0; // no limits
-
- int packetSize = getPacketSize();
- double minNanoSecondsBetweenPackets =
- (periodLengthNanos * packetSize) / ((double)
bytesPerSecond);
- int newMinPacketDelayNS = (int) minNanoSecondsBetweenPackets;
- double inaccuracy = minNanoSecondsBetweenPackets -
newMinPacketDelayNS;
- double inaccuracyPercent = (inaccuracy /
minNanoSecondsBetweenPackets) * 100.0;
- Logger.minor(BlockTransmitter.class, "Quantization inaccuracy:
"+inaccuracyPercent+"%");
- return newMinPacketDelayNS;
- }
-
- public static int convertNanosPerPacketToBytesPerSecond(int delay) {
- if(delay == 0) return 0;
- return (int) (((1000.0*1000.0*1000.0) * getPacketSize()) /
((double)delay));
- }
-
- /** @return The average packet size for a block sent by a
BlockTransmitter, including all
- * headers and protocol overhead */
- private static int getPacketSize() {
- // FIXME make this more accurate!
- return 1024 + 200;
- }
-
- public static int getHardBandwidthLimit() {
- int delay;
- synchronized(lastPacketSendTimeSync) {
- delay = minPacketDelayNSec;
- }
- return convertNanosPerPacketToBytesPerSecond(delay);
- }
-
- /**
- * Set the long-term bandwidth limiter.
- * @param bytes The number of bytes to allow at most over the period.
(in data packets)
- * @param period The length of time over which the limit should apply.
(ms)
- */
- public static void setSoftBandwidthLimit(int bytes, long period) {
- if(period > Long.MAX_VALUE / (1000*1000)) throw new
IllegalArgumentException("Period too long");
- int newSoftLimit = convertBytesPerPeriodToNanosPerPacket(bytes,
period);
- period = period * 1000 * 1000;
- synchronized(lastPacketSendTimeSync) {
- minSoftDelayNSec = newSoftLimit;
- softLimitPeriodNSec = period;
- long nowNS = System.currentTimeMillis() * 1000 * 1000;
- if(nowNS - softLastPacketSendTimeNSec > period / 2) {
- softLastPacketSendTimeNSec = nowNS - (period /
2);
- }
- }
- }
-
- public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source) {
+ public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle) {
_usm = usm;
_destination = destination;
_uid = uid;
_prb = source;
+ this._masterThrottle = masterThrottle;
+ PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize,
_prb._packets)
+ + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
try {
_sentPackets = new BitArray(_prb.getNumPackets());
} catch (AbortedException e) {
@@ -177,7 +80,7 @@
long startCycleTime =
System.currentTimeMillis();
try {
while (true) {
-
synchronized(_unsent) {
+
synchronized(_senderThread) {
if(_unsent.size() != 0) break;
// No
unsent packets
if(getNumSent() == _prb.getNumPackets()) {
@@ -185,9 +88,7 @@
if(timeAllSent <= 0)
timeAllSent = System.currentTimeMillis();
}
- }
-
if(_sendComplete) return;
- synchronized
(_senderThread) {
+
if(_sendComplete) return;
_senderThread.wait(10*1000);
}
}
@@ -200,35 +101,25 @@
}
return;
}
- long startDelayTime =
System.currentTimeMillis();
- delay(startCycleTime);
int packetNo;
try {
- synchronized(_unsent) {
+
synchronized(_senderThread) {
packetNo =
((Integer) _unsent.removeFirst()).intValue();
}
} catch (NoSuchElementException
nsee) {
// back up to the top
to check for completion
continue;
}
+ delay(startCycleTime);
_sentPackets.setBit(packetNo,
true);
try {
- long endDelayTime =
System.currentTimeMillis();
- long delayTime =
endDelayTime - startDelayTime;
-
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
-
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null);
- // May have been delays
in sending, so update to avoid sending more frequently than allowed
- long nowNS =
System.currentTimeMillis() * 1000 * 1000;
-
synchronized(lastPacketSendTimeSync) {
-
if(hardLastPacketSendTimeNSec < nowNS)
-
hardLastPacketSendTimeNSec = nowNS;
- }
+
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE);
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
sentSinceLastPing = 0;
//_usm.send(BlockTransmitter.this._destination, DMT.createPing());
-
((PeerNode)_destination).sendAsync(DMT.createPing(), null);
+
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0);
}
} catch (NotConnectedException
e) {
Logger.normal(this,
"Terminating send: "+e);
@@ -247,101 +138,33 @@
}
/** @return True if _sendComplete */
- private boolean delay(long startCycleTime) {
+ private void delay(long startCycleTime) {
// Get the current inter-packet delay
long delay = throttle.getDelay();
Logger.minor(this, "Throttle delay: "+delay);
- while(true) {
-
- long nowNS = System.currentTimeMillis()
* 1000 * 1000;
-
- Logger.minor(this,
"Now="+nowNS/(1000*1000));
-
- long endTime = -1;
-
- boolean thenSend = true;
-
- // Synchronize on the static lock, and
update
- synchronized(lastPacketSendTimeSync) {
-
- // Get the current time
- nowNS =
System.currentTimeMillis() * 1000 * 1000;
-
- // Update time if necessary to
avoid spurts
- if(hardLastPacketSendTimeNSec <
(nowNS - minPacketDelayNSec)) {
- Logger.minor(this,
"Updating hard limit counter - was "+hardLastPacketSendTimeNSec/1000);
-
hardLastPacketSendTimeNSec = nowNS - minPacketDelayNSec;
- Logger.minor(this,
"Updated hard limit counter - now "+hardLastPacketSendTimeNSec/1000);
+ long startThrottle = System.currentTimeMillis();
+ _masterThrottle.blockingGrab(PACKET_SIZE);
+
+ long now = System.currentTimeMillis();
+
+ long delayTime = now - startThrottle;
+
+
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
+
+ long end = startCycleTime + delay;
+
+ if(now > end) return;
+ while(now < end) {
+ long l = end - now;
+ int x = (int) (Math.min(l,
Integer.MAX_VALUE));
+ if(x > 0)
+ try {
+ Thread.sleep(x);
+ } catch (InterruptedException
e) {
+ // Ignore
}
-
- // Wait until the next send
window
- long
newHardLastPacketSendTimeNS =
-
hardLastPacketSendTimeNSec + minPacketDelayNSec;
-
- Logger.minor(this, "Waiting for
"+minPacketDelayNSec+" until "+newHardLastPacketSendTimeNS/1000);
-
- long newHardLastPacketSendTime =
-
newHardLastPacketSendTimeNS / (1000 * 1000);
-
- long earliestSendTime =
startCycleTime + delay;
-
- Logger.minor(this, "Earliest
time by hard limit: "+newHardLastPacketSendTime);
- Logger.minor(this, "Earliest
time by throttle: "+earliestSendTime);
-
- if(earliestSendTime >
newHardLastPacketSendTime) {
- // Don't clog up other
senders!
- thenSend = false;
- endTime =
earliestSendTime;
- Logger.minor(this,
"Looping");
- } else {
-
hardLastPacketSendTimeNSec = newHardLastPacketSendTimeNS;
- endTime =
hardLastPacketSendTimeNSec / (1000 * 1000);
-
- // What about the soft
limit?
-
- // We can only
accumulate burst traffic rights for a full period at most.
- // If we have a period
of 1 hour, and we send no traffic in the first 30 minutes,
- // then we can use up
our whole hour's quota in the next 30 minutes if we need to.
- // We could even use
our entire quota in the last 5 minutes. After that, we can
- // only send at the
limit (which may be very low), since we have no quota left.
- // However, after 1
hour we forget our burst rights.
- if(nowNS -
softLastPacketSendTimeNSec > softLimitPeriodNSec) {
-
softLastPacketSendTimeNSec = nowNS - (softLimitPeriodNSec);
-
Logger.minor(this, "Updating soft limit");
- }
-
-
softLastPacketSendTimeNSec += minSoftDelayNSec;
-
-
if(softLastPacketSendTimeNSec > hardLastPacketSendTimeNSec) {
- endTime =
((hardLastPacketSendTimeNSec = softLastPacketSendTimeNSec) / (1000 * 1000));
-
Logger.minor(this, "endTime now "+endTime+" because of soft limit");
- }
- }
- }
-
- long now = nowNS / (1000 * 1000);
-
- while(now < endTime) {
- synchronized(_senderThread) {
- if(_sendComplete)
- return true;
- try {
-
_senderThread.wait(endTime - now);
- } catch
(InterruptedException e) {
- // Ignore
- }
- }
- if(_sendComplete)
- return true;
- now =
System.currentTimeMillis();
- }
- Logger.minor(this, "Completed wait at
"+now);
-
- nowNS = now * 1000 * 1000;
-
- if(thenSend) return false;
}
}
};
@@ -362,16 +185,16 @@
_unsent = _prb.addListener(myListener = new
PartiallyReceivedBlock.PacketReceivedListener() {;
public void packetReceived(int packetNo) {
- _unsent.addLast(new Integer(packetNo));
- _sentPackets.setBit(packetNo, false);
synchronized(_senderThread) {
+ _unsent.addLast(new Integer(packetNo));
+ _sentPackets.setBit(packetNo, false);
_senderThread.notify();
}
}
public void receiveAborted(int reason, String
description) {
try {
-
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason,
description), null);
+
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason,
description), null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Receive aborted and receiver is not
connected");
}
@@ -427,13 +250,11 @@
for (Iterator i = missing.iterator();
i.hasNext();) {
Integer packetNo = (Integer) i.next();
if
(_prb.isReceived(packetNo.intValue())) {
- synchronized(_unsent) {
+ synchronized(_senderThread) {
_unsent.addFirst(packetNo);
- }
-
_sentPackets.setBit(packetNo.intValue(), false);
- synchronized(_senderThread) {
+
_sentPackets.setBit(packetNo.intValue(), false);
_senderThread.notify();
- }
+ }
}
}
} else if (msg.getSpec().equals(DMT.allReceived)) {
@@ -508,45 +329,5 @@
public PeerContext getDestination() {
return _destination;
}
-
- public static boolean isUncontended() {
- long nowNS = System.currentTimeMillis() * 1000 * 1000;
-
- // Synchronize on the static lock, and update
- synchronized(lastPacketSendTimeSync) {
-
- // Get the current time
- nowNS = System.currentTimeMillis() * 1000 * 1000;
-
- // Update time if necessary to avoid spurts
- if(hardLastPacketSendTimeNSec < (nowNS -
minPacketDelayNSec)) {
- // Can send immediately!
- } else {
- Logger.minor(BlockTransmitter.class, "Is
contended (hard limit)");
- return false; // is contended.
- }
-
- // What about the soft limit?
-
- // We can only accumulate burst traffic rights for a
full period at most.
- // If we have a period of 1 hour, and we send no
traffic in the first 30 minutes,
- // then we can use up our whole hour's quota in the
next 30 minutes if we need to.
- // We could even use our entire quota in the last 5
minutes. After that, we can
- // only send at the limit (which may be very low),
since we have no quota left.
- // However, after 1 hour we forget our burst rights.
- if(nowNS - softLastPacketSendTimeNSec >
softLimitPeriodNSec) {
- softLastPacketSendTimeNSec = nowNS -
(softLimitPeriodNSec);
- }
-
- //softLastPacketSendTimeNSec += minSoftDelayNSec;
-
- if(softLastPacketSendTimeNSec + minSoftDelayNSec >
nowNS) {
- Logger.minor(BlockTransmitter.class, "Is
contended (soft limit)");
- // Can't send immediately due to soft limit.
- return false;
- }
- }
-
- return true;
- }
+
}
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -66,7 +66,7 @@
AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
this.pn = pn;
- bt = new BlockTransmitter(node.usm, pn, uid, prb);
+ bt = new BlockTransmitter(node.usm, pn, uid, prb,
node.outputThrottle);
}
void start() {
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -36,20 +36,18 @@
static final EntropySource fnpTimingSource = new EntropySource();
static final EntropySource myPacketDataSource = new EntropySource();
static final int RANDOM_BYTES_LENGTH = 12;
- final int HASH_LENGTH;
+ static final int HASH_LENGTH = 32;
+ /** Minimum headers overhead */
+ public static final int HEADERS_LENGTH_MINIMUM =
+ HASH_LENGTH + RANDOM_BYTES_LENGTH + 4 + 6;
+ /** Headers overhead if there is one message and no acks. */
+ public static final int HEADERS_LENGTH_ONE_MESSAGE =
+ HEADERS_LENGTH_MINIMUM + 1; // 1 byte = length of message. rest
is the same.
public FNPPacketMangler(Node node) {
this.node = node;
this.pm = node.peers;
this.usm = node.usm;
- MessageDigest md;
- try {
- md = MessageDigest.getInstance("SHA-256");
- } catch (NoSuchAlgorithmException e) {
- throw new Error(e);
- }
-
- HASH_LENGTH = md.getDigestLength();
}
/**
@@ -88,7 +86,7 @@
if(opn != null) {
Logger.minor(this, "Trying exact match");
- if(length > HASH_LENGTH + RANDOM_BYTES_LENGTH + 4 + 6) {
+ if(length > HEADERS_LENGTH_MINIMUM) {
if(tryProcess(buf, offset, length,
opn.getCurrentKeyTracker())) return;
// Try with old key
if(tryProcess(buf, offset, length,
opn.getPreviousKeyTracker())) return;
@@ -387,14 +385,19 @@
pcfb.blockEncipher(output, 0, output.length);
System.arraycopy(output, 0, data, hash.length+iv.length+2,
output.length);
try {
- usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+ sendPacket(data, replyTo, pn, 0);
} catch (LocalAddressException e) {
Logger.error(this, "Tried to send auth packet to local
address: "+replyTo+" for "+pn);
}
Logger.minor(this, "Sending auth packet (long) to "+replyTo+" - size
"+data.length+" data length: "+output.length);
}
- /**
+ private void sendPacket(byte[] data, Peer replyTo, PeerNode pn, int
alreadyReportedBytes) throws LocalAddressException {
+ usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+ node.outputThrottle.forceGrab(data.length - alreadyReportedBytes);
+ }
+
+ /**
* @param i
* @param payload
* @param pn
@@ -801,6 +804,7 @@
public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode pn,
boolean neverWaitForPacketNumber, boolean dontRequeue) {
Logger.minor(this, "processOutgoingOrRequeue "+messages.length+"
messages for "+pn+" ("+neverWaitForPacketNumber+")");
byte[][] messageData = new byte[messages.length][];
+ int[] alreadyReported = new int[messages.length];
int length = 1;
int callbacksCount = 0;
int x = 0;
@@ -816,7 +820,7 @@
return;
}
int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
- this.processOutgoingPreformatted(buf, 0, buf.length,
pn.getCurrentKeyTracker(), packetNumber, mi.cb);
+ this.processOutgoingPreformatted(buf, 0, buf.length,
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
} catch (NotConnectedException e) {
Logger.minor(this, "Caught "+e+" while sending messages,
requeueing");
// Requeue
@@ -845,7 +849,9 @@
}
} else {
byte[] data = mi.getData(this, pn);
- messageData[x++] = data;
+ messageData[x] = data;
+ alreadyReported[x] = mi.alreadyReportedBytes;
+ x++;
if(mi.cb != null) callbacksCount += mi.cb.length;
Logger.minor(this, "Sending: "+mi+" length "+data.length+" cb
"+mi.cb);
length += (data.length + 2);
@@ -858,9 +864,11 @@
}
AsyncMessageCallback callbacks[] = new
AsyncMessageCallback[callbacksCount];
x=0;
+ int alreadyReportedBytes = 0;
for(int i=0;i<messages.length;i++) {
if(messages[i].formatted) continue;
if(messages[i].cb != null) {
+ alreadyReportedBytes += messages[i].alreadyReportedBytes;
System.arraycopy(messages[i].cb, 0, callbacks, x,
messages[i].cb.length);
x += messages[i].cb.length;
}
@@ -870,7 +878,7 @@
if(length < node.usm.getMaxPacketSize() &&
messages.length < 256) {
try {
- innerProcessOutgoing(messageData, 0, messageData.length,
length, pn, neverWaitForPacketNumber, callbacks);
+ innerProcessOutgoing(messageData, 0, messageData.length,
length, pn, neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending messages,
requeueing");
// Requeue
@@ -895,6 +903,7 @@
length = 56;
int count = 0;
int lastIndex = 0;
+ alreadyReportedBytes = 0;
for(int i=0;i<=messages.length;i++) {
int thisLength;
if(i == messages.length) thisLength = 0;
@@ -910,7 +919,7 @@
// e.g. lastIndex = 0, i = 1, we just send message 0
if(lastIndex != i) {
try {
- innerProcessOutgoing(messageData, lastIndex,
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks);
+ innerProcessOutgoing(messageData, lastIndex,
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks,
alreadyReportedBytes);
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending
messages, requeueing remaining messages");
// Requeue
@@ -935,7 +944,10 @@
if(i != messages.length)
length = 1 + (messageData[i].length + 2);
count = 0;
- } else length = newLength;
+ } else {
+ length = newLength;
+ alreadyReportedBytes += alreadyReported[i];
+ }
}
}
}
@@ -949,7 +961,8 @@
* @param pn Node to send the messages to.
* @throws PacketSequenceException
*/
- private void innerProcessOutgoing(byte[][] messageData, int start, int
length, int bufferLength, PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks) throws NotConnectedException,
WouldBlockException, PacketSequenceException {
+ private void innerProcessOutgoing(byte[][] messageData, int start, int
length, int bufferLength,
+ PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks, int alreadyReportedBytes) throws
NotConnectedException, WouldBlockException, PacketSequenceException {
Logger.minor(this,
"innerProcessOutgoing(...,"+start+","+length+","+bufferLength+")");
byte[] buf = new byte[bufferLength];
buf[0] = (byte)length;
@@ -962,7 +975,7 @@
System.arraycopy(data, 0, buf, loc, len);
loc += len;
}
- processOutgoingPreformatted(buf, 0, bufferLength, pn,
neverWaitForPacketNumber, callbacks);
+ processOutgoingPreformatted(buf, 0, bufferLength, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
}
/**
@@ -971,13 +984,13 @@
* @throws PacketSequenceException
* @throws WouldBlockException
*/
- public void processOutgoing(byte[] buf, int offset, int length,
PeerContext peer) throws NotConnectedException, PacketSequenceException,
WouldBlockException {
+ public void processOutgoing(byte[] buf, int offset, int length,
PeerContext peer, int alreadyReportedBytes) throws NotConnectedException,
PacketSequenceException, WouldBlockException {
Logger.minor(this, "processOutgoing(buf, "+offset+", "+length+",
"+peer.getPeer());
if(!(peer instanceof PeerNode))
throw new IllegalArgumentException();
PeerNode pn = (PeerNode)peer;
byte[] newBuf = preformat(buf, offset, length);
- processOutgoingPreformatted(newBuf, 0, newBuf.length, pn, -1, null);
+ processOutgoingPreformatted(newBuf, 0, newBuf.length, pn, -1, null,
alreadyReportedBytes);
}
@@ -987,29 +1000,20 @@
* @throws PacketSequenceException
* @throws WouldBlockException
*/
- public void processOutgoing(byte[] buf, int offset, int length, KeyTracker
tracker) throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException {
+ public void processOutgoing(byte[] buf, int offset, int length, KeyTracker
tracker, int alreadyReportedBytes) throws KeyChangedException,
NotConnectedException, PacketSequenceException, WouldBlockException {
byte[] newBuf = preformat(buf, offset, length);
- processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, -1,
null);
+ processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, -1,
null, alreadyReportedBytes);
}
/**
- * Send a packet, with a packet number.
- * @throws PacketSequenceException
- * @throws WouldBlockException If allocating a packet number would have
blocked.
- */
- public void processOutgoing(byte[] buf, int offset, int length, KeyTracker
tracker, int packetNo, AsyncMessageCallback[] callbacks) throws
KeyChangedException, NotConnectedException, PacketSequenceException,
WouldBlockException {
- byte[] newBuf = preformat(buf, offset, length);
- processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker,
packetNo, callbacks);
- }
-
- /**
* Send a packet using the current key. Retry if it fails solely because
* the key changes.
* @throws PacketSequenceException
* @throws WouldBlockException
*/
- void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, int k, AsyncMessageCallback[] callbacks) throws
NotConnectedException, PacketSequenceException, WouldBlockException {
+ void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, int k, AsyncMessageCallback[] callbacks,
+ int alreadyReportedBytes) throws NotConnectedException,
PacketSequenceException, WouldBlockException {
while(true) {
try {
Logger.minor(this, "At beginning of processOutgoingPreformatted
loop for "+peer.getPeer());
@@ -1018,7 +1022,7 @@
Logger.normal(this, "Dropping packet: Not connected yet");
throw new NotConnectedException();
}
- processOutgoingPreformatted(buf, offset, length, tracker, k,
callbacks);
+ processOutgoingPreformatted(buf, offset, length, tracker, k,
callbacks, alreadyReportedBytes);
return;
} catch (KeyChangedException e) {
Logger.normal(this, "Key changed");
@@ -1032,7 +1036,7 @@
* the key changes.
* @throws PacketSequenceException
*/
- void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks) throws NotConnectedException, WouldBlockException,
PacketSequenceException {
+ void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks, int alreadyReportedBytes) throws NotConnectedException,
WouldBlockException, PacketSequenceException {
while(true) {
try {
KeyTracker tracker = peer.getCurrentKeyTracker();
@@ -1042,7 +1046,7 @@
}
int seqNo = neverWaitForPacketNumber ?
tracker.allocateOutgoingPacketNumberNeverBlock() :
tracker.allocateOutgoingPacketNumber();
- processOutgoingPreformatted(buf, offset, length, tracker,
seqNo, callbacks);
+ processOutgoingPreformatted(buf, offset, length, tracker,
seqNo, callbacks, alreadyReportedBytes);
return;
} catch (KeyChangedException e) {
// Go around again
@@ -1086,7 +1090,7 @@
* @throws PacketSequenceException
* @throws WouldBlockException If we cannot allocate a packet number
because it would block.
*/
- public void processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks)
throws KeyChangedException, NotConnectedException, PacketSequenceException,
WouldBlockException {
+ public void processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks,
int alreadyReportedBytes) throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException {
if(Logger.shouldLog(Logger.MINOR, this)) {
String log =
"processOutgoingPreformatted("+Fields.hashCode(buf)+",
"+offset+","+length+","+tracker+","+packetNumber+",";
if(callbacks == null) log += "null";
@@ -1229,7 +1233,7 @@
Logger.minor(this, "Sending...");
- processOutgoingFullyFormatted(plaintext, tracker, callbacks);
+ processOutgoingFullyFormatted(plaintext, tracker, callbacks,
alreadyReportedBytes);
Logger.minor(this, "Sent packet");
}
@@ -1238,7 +1242,7 @@
* @param plaintext The packet's plaintext, including all formatting,
* including acks and resend requests. Is clobbered.
*/
- private void processOutgoingFullyFormatted(byte[] plaintext, KeyTracker
kt, AsyncMessageCallback[] callbacks) {
+ private void processOutgoingFullyFormatted(byte[] plaintext, KeyTracker
kt, AsyncMessageCallback[] callbacks, int alreadyReportedBytes) {
BlockCipher sessionCipher = kt.sessionCipher;
Logger.minor(this, "Encrypting with
"+HexUtil.bytesToHex(kt.sessionKey));
if(sessionCipher == null) {
@@ -1298,7 +1302,7 @@
// pn.getPeer() cannot be null
try {
- usm.sendPacket(output, kt.pn.getPeer(),
kt.pn.allowLocalAddresses());
+ sendPacket(output, kt.pn.getPeer(), kt.pn,
alreadyReportedBytes);
} catch (LocalAddressException e) {
Logger.error(this, "Tried to send data packet to local
address: "+kt.pn.getPeer()+" for "+kt.pn.allowLocalAddresses());
}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -106,9 +106,9 @@
if(source.isConnected() && startTime >
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null);
+ source.sendAsync(tooSlow, null, 0);
Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
- source.sendAsync(m, null);
+ source.sendAsync(m, null, 0);
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
br = new BlockReceiver(node.usm, source, uid, prb);
prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
@@ -314,7 +314,7 @@
boolean failed = sender.anyTransfersFailed();
Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
try {
- source.sendAsync(m, null);
+ source.sendAsync(m, null, 0);
Logger.minor(this, "Sent completion: "+failed+" for
"+this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected: "+source+" for
"+this);
@@ -344,7 +344,7 @@
}
if(toSend != null) {
try {
- source.sendAsync(toSend, null);
+ source.sendAsync(toSend, null, 0);
} catch (NotConnectedException e) {
// :(
Logger.minor(this, "Lost connection in "+this+" when sending
FNPDataInsertRejected");
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -842,7 +842,7 @@
AsyncMessageCallback[] callbacks = element.callbacks;
// Ignore packet#
Logger.minor(this, "Queueing resend of what was once
"+element.packetNumber);
- messages[i] = new MessageItem(buf, callbacks, true);
+ messages[i] = new MessageItem(buf, callbacks, true, 0);
}
pn.requeueMessageItems(messages, 0, messages.length, true);
pn.node.ps.queuedResendPacket();
Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -675,7 +675,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to "+pn+" rejecting
SwapRequest");
}
@@ -687,7 +687,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting SwapRequest from
"+pn);
}
@@ -705,7 +705,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection rejecting SwapRequest
(locked) from "+pn);
}
@@ -740,7 +740,7 @@
Logger.minor(this, "Late reject "+uid);
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e1) {
Logger.normal(this, "Late reject but disconnected from
sender: "+pn);
}
@@ -754,7 +754,7 @@
// Forward the request.
// Note that we MUST NOT send this blocking as we are on
the
// receiver thread.
- randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(uid), pn, item));
+ randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0);
} catch (NotConnectedException e) {
// Try a different node
continue;
@@ -801,7 +801,7 @@
m.set(DMT.UID, item.incomingID);
Logger.minor(this, "Forwarding SwapReply "+uid+" from
"+m.getSource()+" to "+item.requestSender);
try {
- item.requestSender.sendAsync(m, null);
+ item.requestSender.sendAsync(m, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapReply "+uid+"
to "+item.requestSender);
}
@@ -833,7 +833,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null);
+ item.requestSender.sendAsync(m, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapRejected
"+uid+" to "+item.requestSender);
}
@@ -860,7 +860,7 @@
// Sending onwards - use outgoing ID
m.set(DMT.UID, item.outgoingID);
try {
- item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender));
+ item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender), 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapCommit "+uid+"
to "+item.routedTo);
}
@@ -897,7 +897,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null);
+ item.requestSender.sendAsync(m, null, 0);
} catch (NotConnectedException e) {
Logger.normal(this, "Lost connection forwarding SwapComplete
"+uid+" to "+item.requestSender);
}
@@ -943,7 +943,7 @@
Message msg = DMT.createFNPSwapRejected(item.incomingID);
Logger.minor(this, "Rejecting in lostOrRestartedNode:
"+item.incomingID+ " from "+item.requestSender);
try {
- item.requestSender.sendAsync(msg, null);
+ item.requestSender.sendAsync(msg, null, 0);
} catch (NotConnectedException e1) {
Logger.normal(this, "Both sender and receiver disconnected for
"+item);
}
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -9,12 +9,14 @@
byte[] buf;
final AsyncMessageCallback[] cb;
final long submitted;
+ final int alreadyReportedBytes;
/** If true, the buffer may contain several messages, and is formatted
* for sending as a single packet.
*/
final boolean formatted;
- public MessageItem(Message msg2, AsyncMessageCallback[] cb2) {
+ public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int
alreadyReportedBytes) {
+ this.alreadyReportedBytes = alreadyReportedBytes;
this.msg = msg2;
this.cb = cb2;
buf = null;
@@ -22,7 +24,8 @@
this.submitted = System.currentTimeMillis();
}
- public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted) {
+ public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes) {
+ this.alreadyReportedBytes = alreadyReportedBytes;
this.cb = cb2;
this.msg = null;
this.buf = data;
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-07-07 19:29:24 UTC (rev
9496)
+++ trunk/freenet/src/freenet/node/Node.java 2006-07-07 20:20:09 UTC (rev
9497)
@@ -73,7 +73,6 @@
import freenet.io.comm.PeerParseException;
import freenet.io.comm.UdpSocketManager;
import freenet.io.xfer.AbortedException;
-import freenet.io.xfer.BlockTransmitter;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
@@ -109,6 +108,7 @@
import freenet.support.Base64;
import freenet.support.Bucket;
import freenet.support.BucketFactory;
+import freenet.support.DoubleTokenBucket;
import freenet.support.Fields;
import freenet.support.FileLoggerHook;
import freenet.support.HexUtil;
@@ -561,6 +561,7 @@
final boolean testnetEnabled;
final TestnetHandler testnetHandler;
final StaticSwapRequestInterval swapInterval;
+ public final DoubleTokenBucket outputThrottle;
static short MAX_HTL = 10;
static final int EXIT_STORE_FILE_NOT_FOUND = 1;
static final int EXIT_STORE_IOEXCEPTION = 2;
@@ -610,7 +611,7 @@
final MyRequestThrottle sskInsertThrottle;
final RequestStarter sskInsertStarter;
public final UserAlertManager alerts;
- final RunningAverage throttledPacketSendAverage;
+ final TimeDecayingRunningAverage throttledPacketSendAverage;
/** Must be included as a hidden field in order for any dangerous HTTP
operation to complete successfully. */
public final String formPassword;
@@ -1013,7 +1014,7 @@
bootID = random.nextLong();
throttledPacketSendAverage =
new TimeDecayingRunningAverage(1, 10*60*1000 /* should
be significantly longer than a typical transfer */, 0, Long.MAX_VALUE);
-
+
buildOldAgeUserAlert = new BuildOldAgeUserAlert();
primaryIPUndetectedAlert = new IPUndetectedUserAlert();
@@ -1204,19 +1205,19 @@
"Output bandwidth limit (bytes per second)",
"Hard output bandwidth limit (bytes/sec); the node should almost never exceed
this",
new IntCallback() {
public int get() {
- return
BlockTransmitter.getHardBandwidthLimit();
+ //return
BlockTransmitter.getHardBandwidthLimit();
+ return (int) ((1000L * 1000L *
1000L) / outputThrottle.getNanosPerTick());
}
public void set(int val) throws
InvalidConfigValueException {
-
BlockTransmitter.setHardBandwidthLimit(val);
+ if(val <= 0) throw new
InvalidConfigValueException("Bandwidth limit must be positive");
+
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / val, val,
(val * 4) / 5);
}
});
int obwLimit = nodeConfig.getInt("outputBandwidthLimit");
- BlockTransmitter.setHardBandwidthLimit(obwLimit);
+ this.outputThrottle = new DoubleTokenBucket(obwLimit/2,
(1000L*1000L*1000L) / obwLimit, obwLimit, (obwLimit * 4) / 5);
+
// FIXME add an averaging/long-term/soft bandwidth limit. (bug
76)
- // There is already untested support for this in
BlockTransmitter.
- // No long-term limit for now.
- BlockTransmitter.setSoftBandwidthLimit(0, 0);
// SwapRequestInterval
@@ -2168,19 +2169,23 @@
long lastCheckedUncontended = -1;
+ static final int ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET =
+ 1024 + DMT.packetTransmitSize(1024, 32)
+ + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
+
/* return reject reason as string if should reject, otherwise return null
*/
public synchronized String shouldRejectRequest(boolean canAcceptAnyway)
{
long now = System.currentTimeMillis();
- if(now - lastCheckedUncontended > 1000) {
- lastCheckedUncontended = now;
- if(BlockTransmitter.isUncontended()) {
- Logger.minor(this, "Reporting 0 because
throttle uncontended: now "+throttledPacketSendAverage.currentValue());
- throttledPacketSendAverage.report(0);
- Logger.minor(this, "New average:
"+throttledPacketSendAverage.currentValue());
- Logger.minor(this, "Average:
"+throttledPacketSendAverage.toString());
- } else
- Logger.minor(this, "Not uncontended");
+ double bwlimitDelayTime =
throttledPacketSendAverage.currentValue();
+
+ if(throttledPacketSendAverage.lastReportTime() <
System.currentTimeMillis() - 5000) {
+
outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+
outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+ long after = System.currentTimeMillis();
+ throttledPacketSendAverage.report(after - now);
+ now = after;
+ bwlimitDelayTime =
throttledPacketSendAverage.currentValue();
}
// Round trip time
@@ -2202,7 +2207,6 @@
// Bandwidth limited packets
- double bwlimitDelayTime =
this.throttledPacketSendAverage.currentValue();
Logger.minor(this, "bwlimitDelayTime = "+bwlimitDelayTime);
if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
if(now - lastAcceptedRequest > MAX_INTERREQUEST_TIME &&
canAcceptAnyway) {
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -45,7 +45,7 @@
// Send an FNPPong
Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
try {
- ((PeerNode)m.getSource()).sendAsync(reply, null); // nothing
we can do if can't contact source
+ ((PeerNode)m.getSource()).sendAsync(reply, null, 0); //
nothing we can do if can't contact source
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection replying to "+m);
}
@@ -83,7 +83,7 @@
long id = m.getLong(DMT.PING_SEQNO);
Message msg = DMT.createFNPLinkPong(id);
try {
- source.sendAsync(msg, null);
+ source.sendAsync(msg, null, 0);
} catch (NotConnectedException e) {
// Ignore
}
@@ -113,7 +113,7 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting data request (loop, finished):
"+e);
}
@@ -125,7 +125,7 @@
Logger.normal(this, "Rejecting request from
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id, true);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data request from
"+m.getSource().getPeer()+": "+e);
}
@@ -136,7 +136,7 @@
Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already
running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -158,7 +158,7 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -170,7 +170,7 @@
Logger.normal(this, "Rejecting insert from
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id, true);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -181,7 +181,7 @@
Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already
running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -269,7 +269,7 @@
ctx = (RoutedContext)routedContexts.get(lid);
if(ctx != null) {
try {
-
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id,
(short)(htl-1)), null);
+
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id,
(short)(htl-1)), null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting "+m);
}
@@ -289,7 +289,7 @@
} else if(htl == 0) {
Message reject = DMT.createFNPRoutedRejected(id, (short)0);
if(pn != null) try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting "+m);
}
@@ -311,7 +311,7 @@
PeerNode pn = ctx.source;
if(pn == null) return false;
try {
- pn.sendAsync(m, null);
+ pn.sendAsync(m, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
}
@@ -330,7 +330,7 @@
Logger.minor(this, "Forwarding "+m.getSpec()+" to
"+next.getPeer().getPort());
ctx.addSent(next);
try {
- next.sendAsync(m, null);
+ next.sendAsync(m, null, 0);
} catch (NotConnectedException e) {
continue;
}
@@ -339,7 +339,7 @@
// Reached a dead end...
Message reject = DMT.createFNPRoutedRejected(id, htl);
if(pn != null) try {
- pn.sendAsync(reject, null);
+ pn.sendAsync(reject, null, 0);
} catch (NotConnectedException e) {
Logger.error(this, "Cannot send reject message back to
source "+pn);
return true;
@@ -374,7 +374,7 @@
int x = m.getInt(DMT.COUNTER);
Message reply = DMT.createFNPRoutedPong(id, x);
try {
- src.sendAsync(reply, null);
+ src.sendAsync(reply, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection replying to "+m+" in
dispatchRoutedMessage");
}
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -196,7 +196,7 @@
if(item == null) continue;
try {
Logger.minor(this, "Resending
"+item.packetNumber+" to "+item.kt);
-
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length,
item.kt, item.packetNumber, item.callbacks);
+
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length,
item.kt, item.packetNumber, item.callbacks, 0);
} catch (KeyChangedException e) {
Logger.error(this, "Caught "+e+" resending packets
to "+kt);
pn.requeueResendItems(resendItems);
@@ -250,7 +250,7 @@
// Force packet to have a sequence number.
Message m = DMT.createFNPVoid();
pn.addToLocalNodeSentMessagesToStatistic(m);
- node.packetMangler.processOutgoingOrRequeue(new
MessageItem[] { new MessageItem(m, null) }, pn, true, true);
+ node.packetMangler.processOutgoingOrRequeue(new
MessageItem[] { new MessageItem(m, null, 0) }, pn, true, true);
}
} else {
// Not connected
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -328,7 +328,7 @@
PeerNode[] peers = connectedPeers; // avoid synchronization
for(int i=0;i<peers.length;i++) {
if(peers[i].isConnected()) try {
- peers[i].sendAsync(msg, null);
+ peers[i].sendAsync(msg, null, 0);
} catch (NotConnectedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -687,11 +687,15 @@
/**
* Send a message, off-thread, to this node.
* @param msg The message to be sent.
+ * @param cb The callback to be called when the packet has been sent, or
null.
+ * @param alreadyReportedBytes The number of bytes already reported to the
throttle
+ * relating to this packet (normally set when we have delayed a packet in
order to
+ * throttle it).
*/
- public void sendAsync(Message msg, AsyncMessageCallback cb) throws
NotConnectedException {
+ public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes) throws NotConnectedException {
Logger.minor(this, "Sending async: "+msg+" : "+cb+" on "+this);
if(!isConnected) throw new NotConnectedException();
- MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[] {cb});
+ MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[] {cb}, alreadyReportedBytes);
synchronized(routingBackoffSync) {
reportBackoffStatus(System.currentTimeMillis());
}
@@ -1203,8 +1207,8 @@
Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);
try {
- sendAsync(locMsg, null);
- sendAsync(ipMsg, null);
+ sendAsync(locMsg, null, 0);
+ sendAsync(ipMsg, null, 0);
} catch (NotConnectedException e) {
Logger.error(this, "Completed handshake with "+getPeer()+" but
disconnected!!!", new Exception("error"));
}
@@ -1419,7 +1423,7 @@
long t = tracker.getNextUrgentTime();
if(t < now) {
try {
- node.packetMangler.processOutgoing(null, 0, 0, tracker);
+ node.packetMangler.processOutgoing(null, 0, 0, tracker, 0);
} catch (NotConnectedException e) {
// Ignore
} catch (KeyChangedException e) {
@@ -1436,7 +1440,7 @@
long t = tracker.getNextUrgentTime();
if(t < now) {
try {
- node.packetMangler.processOutgoing(null, 0, 0, tracker);
+ node.packetMangler.processOutgoing(null, 0, 0, tracker, 0);
} catch (NotConnectedException e) {
// Ignore
} catch (KeyChangedException e) {
@@ -1597,7 +1601,7 @@
Logger.error(this, "No tracker to resend packet
"+item.packetNumber+" on");
continue;
}
- MessageItem mi = new MessageItem(item.buf, item.callbacks, true);
+ MessageItem mi = new MessageItem(item.buf, item.callbacks, true,
0);
requeueMessageItems(new MessageItem[] {mi}, 0, 1, true);
}
}
@@ -1747,7 +1751,7 @@
}
Message msg = DMT.createFNPLinkPing(pingNo);
try {
- sendAsync(msg, null);
+ sendAsync(msg, null, 0);
} catch (NotConnectedException e) {
synchronized(pingSync) {
pingsSentTimes.removeKey(lPingNo);
@@ -1782,7 +1786,7 @@
node.throttledPacketSendAverage.report(timeDiff);
Logger.minor(this, "Reporting throttled packet send time:
"+timeDiff+" to "+getPeer());
}
-
+
public void setRemoteDetectedPeer(Peer p) {
this.remoteDetectedPeer = p;
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -75,7 +75,7 @@
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb);
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle);
bt.send();
}
return;
@@ -95,7 +95,7 @@
if(rs.waitUntilStatusChange()) {
// Forward RejectedOverload
Message msg = DMT.createFNPRejectedOverload(uid, false);
- source.sendAsync(msg, null);
+ source.sendAsync(msg, null, 0);
}
if(rs.transferStarted()) {
@@ -104,7 +104,7 @@
source.send(df);
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb);
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle);
bt.send(); // either fails or succeeds; other side will see, we
don't care
return;
}
@@ -116,7 +116,7 @@
continue;
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendAsync(dnf, null);
+ source.sendAsync(dnf, null, 0);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -124,12 +124,12 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null);
+ source.sendAsync(reject, null, 0);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.sendAsync(rnf, null);
+ source.sendAsync(rnf, null, 0);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
@@ -151,7 +151,7 @@
continue; // should have started transfer
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null);
+ source.sendAsync(reject, null, 0);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -109,7 +109,7 @@
Logger.minor(this, "Got pubkey on
"+uid+" : "+pubKey);
Message confirm =
DMT.createFNPSSKPubKeyAccepted(uid);
try {
- source.sendAsync(confirm, null);
+ source.sendAsync(confirm, null,
0);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost
connection to source on "+uid);
return;
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -172,7 +172,7 @@
// Send to next node
try {
- next.sendAsync(req, null);
+ next.sendAsync(req, null, 0);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected to "+next);
continue;
@@ -245,7 +245,7 @@
if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey.asBytes());
try {
- next.sendAsync(pkMsg, null);
+ next.sendAsync(pkMsg, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Node disconnected while sending
pubkey: "+next);
continue;
Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2006-07-07 19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2006-07-07 20:20:09 UTC (rev 9497)
@@ -34,7 +34,7 @@
public void disconnected() {
Logger.minor(this, "Disconnect trigger: "+this);
try {
- dest.sendAsync(msg, null);
+ dest.sendAsync(msg, null, 0);
} catch (NotConnectedException e) {
Logger.minor(this, "Both source and destination disconnected:
"+msg+" for "+this);
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-07 19:29:24 UTC (rev
9496)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-07 20:20:09 UTC (rev
9497)
@@ -18,7 +18,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 861;
+ private static final int buildNumber = 862;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 839;
Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/support/BitArray.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -78,6 +78,10 @@
dos.write(_bits);
}
+ public static int serializedLength(int size) {
+ return ((size / 8) + (size % 8 == 0 ? 0 : 1)) + 4;
+ }
+
public int getSize() {
return _size;
}
Modified: trunk/freenet/src/freenet/support/DoubleTokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/DoubleTokenBucket.java 2006-07-07
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/support/DoubleTokenBucket.java 2006-07-07
20:20:09 UTC (rev 9497)
@@ -28,6 +28,7 @@
*/
public DoubleTokenBucket(long max, long nanosPerTick, long
initialValue, long maxForced) {
super(max, nanosPerTick, initialValue);
+ Logger.minor(this, "Max: "+max+" nanosPerTick: "+nanosPerTick+"
initialValue: "+initialValue+" maxForced: "+maxForced);
this.maxForced = maxForced;
this.curForced = 0;
}
@@ -39,11 +40,16 @@
public synchronized void forceGrab(long tokens) {
addTokens();
long thisMax = maxForced - curForced;
- if(tokens > thisMax) tokens = thisMax;
+ if(tokens > thisMax) {
+ Logger.minor(this, "Limiting force-grab to "+thisMax+"
tokens was "+tokens);
+ tokens = thisMax;
+ }
curForced += tokens;
current -= tokens;
- if(current > max) current = max;
- if(curForced > maxForced) curForced = maxForced;
+ if(curForced > maxForced) {
+ curForced = maxForced;
+ }
+ Logger.minor(this, "Force-Grabbed "+tokens+"
current="+current+" forced="+curForced);
}
// blockingGrab is unchanged
@@ -54,12 +60,37 @@
if(curForced > maxForced) curForced = maxForced;
}
- public synchronized void addTokens() {
+ public synchronized void changeNanosAndBucketSizes(long nanos, long
newMax, long newMaxForced) {
+ // FIXME maybe should be combined
+ changeSizeOfBuckets(newMax, newMaxForced);
+ changeNanosPerTick(nanos);
+ }
+
+ public synchronized void addTokensNoClip() {
long add = tokensToAdd();
current += add;
curForced -= add;
if(curForced < 0) curForced = 0;
timeLastTick += add * nanosPerTick;
+ Logger.minor(this, "Added "+add+" tokens current="+current+"
forced="+curForced);
}
+
+ public synchronized void addTokens() {
+ addTokensNoClip();
+ if(curForced > maxForced) curForced = maxForced;
+ if(current > max) current = max;
+ }
+ public synchronized long getNanosPerTick() {
+ return nanosPerTick;
+ }
+
+ public synchronized long getSize() {
+ return max;
+ }
+
+ protected long offset() {
+ return curForced;
+ }
+
}
Modified: trunk/freenet/src/freenet/support/TokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/TokenBucket.java 2006-07-07 19:29:24 UTC
(rev 9496)
+++ trunk/freenet/src/freenet/support/TokenBucket.java 2006-07-07 20:20:09 UTC
(rev 9497)
@@ -10,6 +10,7 @@
protected long max;
protected long timeLastTick;
protected long nanosPerTick;
+ protected long nextWake;
/**
* Create a token bucket.
@@ -20,7 +21,9 @@
this.max = max;
this.current = initialValue;
this.nanosPerTick = nanosPerTick;
- this.timeLastTick = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
+ this.timeLastTick = now * (1000 * 1000);
+ nextWake = now;
}
/**
@@ -56,34 +59,72 @@
public synchronized long count() {
return current;
}
+
+ protected long offset() {
+ return 0;
+ }
+ public synchronized void blockingGrab(long tokens) {
+ if(tokens < max)
+ innerBlockingGrab(tokens);
+ else {
+ for(int i=0;i<tokens;i+=max) {
+ innerBlockingGrab(Math.min(tokens, max));
+ }
+ }
+ }
+
/**
* Grab a bunch of tokens. Block if necessary.
* @param tokens The number of tokens to grab.
*/
- public synchronized void blockingGrab(long tokens) {
+ public synchronized void innerBlockingGrab(long tokens) {
+ Logger.minor(this, "Blocking grab: "+tokens);
+ addTokens();
+ if(current > max) current = max;
+ Logger.minor(this, "current="+current);
+ if(current > tokens) {
+ current -= tokens;
+ return;
+ }
+ long extra = 0;
+ if(current > 0) {
+ tokens -= current;
+ current = 0;
+ } else if(current < 0) {
+ extra = -current;
+ current = 0;
+ }
+ long minDelayNS = nanosPerTick * (tokens + extra);
+ long minDelayMS = minDelayNS / (1000*1000) + (minDelayNS %
(1000*1000) == 0 ? 0 : 1);
+ long now = System.currentTimeMillis();
+
+ // Schedule between the blockingGrab's.
+
+ if(nextWake < now) nextWake = now;
+ long wakeAt = (nextWake += minDelayMS);
while(true) {
- addTokens();
- if(current > tokens) {
- current -= tokens;
- if(current > max) current = max;
- return;
- } else {
- if(current > 0) {
- tokens -= current;
- current = 0;
- }
- }
- long minDelayNS = nanosPerTick * Math.min(tokens,
max/2);
- long minDelayMS = minDelayNS / 1000 + (minDelayNS %
1000 == 0 ? 0 : 1);
+ now = System.currentTimeMillis();
+ int delay = (int) Math.min(Integer.MAX_VALUE, wakeAt -
now);
+ if(delay <= 0) break;
+ Logger.minor(this, "Waiting "+delay+"ms");
try {
- wait(minDelayMS);
+ wait(delay);
} catch (InterruptedException e) {
// Go around the loop again.
}
}
+ // Remove the tokens, even if we have built up a debt due to
forceGrab()s and
+ // will therefore go negative. We have paid off the initial
debt, and we have
+ // paid off the tokens, any more debt is a problem for future
blockingGrab's!
+ current -= tokens;
}
+ public synchronized void recycle(long tokens) {
+ current += tokens;
+ if(current > max) current = max;
+ }
+
/**
* Change the number of nanos per tick.
* @param nanosPerTick The new number of nanos per tick.
@@ -99,7 +140,7 @@
public synchronized void changeBucketSize(long newMax) {
if(newMax <= 0) throw new IllegalArgumentException();
- addTokens();
+ addTokensNoClip();
max = newMax;
if(current > max) current = max;
}
@@ -108,7 +149,7 @@
if(nanosPerTick <= 0) throw new IllegalArgumentException();
if(newMax <= 0) throw new IllegalArgumentException();
// Synchronize up first, using the old nanosPerTick.
- addTokens();
+ addTokensNoClip();
if(nanosPerTick < this.nanosPerTick)
notifyAll();
this.nanosPerTick = nanosPerTick;
@@ -116,10 +157,15 @@
if(current > max) current = max;
}
+ public synchronized void addTokens() {
+ addTokensNoClip();
+ if(current > max) current = max;
+ }
+
/**
* Update the number of tokens according to elapsed time.
*/
- public synchronized void addTokens() {
+ public synchronized void addTokensNoClip() {
long add = tokensToAdd();
current += add;
timeLastTick += add * nanosPerTick;
@@ -127,9 +173,11 @@
}
synchronized long tokensToAdd() {
- long nowNS = System.currentTimeMillis() * 1000;
+ long nowNS = System.currentTimeMillis() * (1000 * 1000);
long nextTick = timeLastTick + nanosPerTick;
- if(nextTick < nowNS) return 0;
+ if(nextTick > nowNS) {
+ return 0;
+ }
if(nextTick + nanosPerTick > nowNS) {
timeLastTick = nextTick;
return 1;
Modified: trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
===================================================================
--- trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
2006-07-07 19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
2006-07-07 20:20:09 UTC (rev 9497)
@@ -156,4 +156,8 @@
public long countReports() {
return totalReports;
}
+
+ public long lastReportTime() {
+ return lastReportTime;
+ }
}