Author: toad
Date: 2006-07-08 01:42:43 +0000 (Sat, 08 Jul 2006)
New Revision: 9500
Added:
trunk/freenet/src/freenet/node/ByteCounter.java
Modified:
trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
trunk/freenet/src/freenet/io/comm/Message.java
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/LocationManager.java
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/DoubleTokenBucket.java
trunk/freenet/src/freenet/support/TokenBucket.java
Log:
863: Long-term bandwidth limiting by calculating bytes used per request of each
type, and allocating accordingly from a token bucket (both input and output)
when accepting requests.
HIGHLY EXPERIMENTAL.
Modified: trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -185,7 +185,7 @@
buf.append("<div class=\"infobox-content\">");
buf.append("Peer
'"+HTMLEncoder.encode(pn.getName())+"' is \"backed off\". N2NTM receipt may be
significantly delayed.<br /><br />\n");
- usm.send(pn, n2ntm);
+ usm.send(pn, n2ntm, null);
Logger.normal(this, "Sent N2NTM to
'"+pn.getName()+"': "+message);
buf.append("Message should be on it's way:<hr
/><br /><br />"+messageTextBuf2+"<br /><br />\n");
@@ -201,7 +201,7 @@
buf.append("<div class=\"infobox-content\">");
buf.append("Sending N2NTM to peer
'"+HTMLEncoder.encode(pn.getName())+"'.<br /><br />\n");
- usm.send(pn, n2ntm);
+ usm.send(pn, n2ntm, null);
Logger.normal(this, "Sent N2NTM to
'"+pn.getName()+"': "+message);
buf.append("Message should be on it's way:<hr
/><br /><br />"+messageTextBuf2+"<br /><br />\n");
Modified: trunk/freenet/src/freenet/io/comm/Message.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/Message.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/io/comm/Message.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -39,8 +39,9 @@
private final PeerContext _source;
private final HashMap _payload = new HashMap();
public final long localInstantiationTime;
+ final int _receivedByteCount;
- public static Message decodeFromPacket(byte[] buf, int offset, int
length, PeerContext peer) {
+ public static Message decodeFromPacket(byte[] buf, int offset, int
length, PeerContext peer, int overhead) {
DataInputStream dis
= new DataInputStream(new ByteArrayInputStream(buf,
offset, length));
@@ -56,7 +57,7 @@
}
if(mspec.isInternalOnly())
return null; // silently discard internal-only messages
- Message m = new Message(mspec, peer);
+ Message m = new Message(mspec, peer, length + overhead);
try {
for (Iterator i = mspec.getOrderedFields().iterator();
i.hasNext();) {
String name = (String) i.next();
@@ -78,13 +79,14 @@
}
public Message(MessageType spec) {
- this(spec, null);
+ this(spec, null, 0);
}
- private Message(MessageType spec, PeerContext source) {
+ private Message(MessageType spec, PeerContext source, int
recvByteCount) {
localInstantiationTime = System.currentTimeMillis();
_spec = spec;
_source = source;
+ _receivedByteCount = recvByteCount;
}
public boolean getBoolean(String key) {
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -25,6 +25,7 @@
import org.tanukisoftware.wrapper.WrapperManager;
import freenet.io.comm.Peer.LocalAddressException;
+import freenet.node.ByteCounter;
import freenet.node.Node;
import freenet.node.PeerNode;
import freenet.support.Logger;
@@ -207,7 +208,7 @@
} else {
// Create a bogus context since no filter
Message m = decodePacket(data, offset, length,
- new DummyPeerContext(peer));
+ new DummyPeerContext(peer), 0);
if (m != null)
checkFilters(m);
}
@@ -223,9 +224,9 @@
* @param length
* @param peer
*/
- public Message decodePacket(byte[] data, int offset, int length,
PeerContext peer) {
+ public Message decodePacket(byte[] data, int offset, int length,
PeerContext peer, int overhead) {
try {
- return Message.decodeFromPacket(data, offset, length, peer);
+ return Message.decodeFromPacket(data, offset, length, peer,
overhead);
} catch (Throwable t) {
Logger.error(this, "Could not decode packet: "+t, t);
return null;
@@ -397,7 +398,7 @@
}
}
- public Message waitFor(MessageFilter filter) throws
DisconnectedException {
+ public Message waitFor(MessageFilter filter, ByteCounter ctr) throws
DisconnectedException {
Logger.debug(this, "Waiting for "+filter);
long startTime = System.currentTimeMillis();
Message ret = null;
@@ -471,6 +472,8 @@
// }
long endTime = System.currentTimeMillis();
Logger.debug(this, "Returning in "+(endTime-startTime)+"ms");
+ if(ctr != null && ret != null)
+ ctr.receivedBytes(ret._receivedByteCount);
return ret;
}
@@ -478,7 +481,7 @@
* Send a Message to a PeerContext.
* @throws NotConnectedException If we are not currently connected to
the node.
*/
- public void send(PeerContext destination, Message m) throws
NotConnectedException {
+ public void send(PeerContext destination, Message m, ByteCounter ctr)
throws NotConnectedException {
if(m.getSpec().isInternalOnly()) {
Logger.error(this, "Trying to send internal-only message "+m+"
of spec "+m.getSpec(), new Exception("debug"));
return;
@@ -503,7 +506,7 @@
// } else {
// sendPacket(blockToSend, destination.getPeer());
// }
- ((PeerNode)destination).sendAsync(m, null, 0);
+ ((PeerNode)destination).sendAsync(m, null, 0, ctr);
}
/**
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -29,6 +29,7 @@
import freenet.io.comm.PeerContext;
import freenet.io.comm.RetrievalException;
import freenet.io.comm.UdpSocketManager;
+import freenet.node.ByteCounter;
import freenet.support.BitArray;
import freenet.support.Buffer;
import freenet.support.Logger;
@@ -49,16 +50,18 @@
UdpSocketManager _usm;
/** packet : Integer -> reportTime : Long * */
HashMap _recentlyReportedMissingPackets = new HashMap();
+ ByteCounter _ctr;
- public BlockReceiver(UdpSocketManager usm, PeerContext sender, long
uid, PartiallyReceivedBlock prb) {
+ public BlockReceiver(UdpSocketManager usm, PeerContext sender, long
uid, PartiallyReceivedBlock prb, ByteCounter ctr) {
_sender = sender;
_prb = prb;
_uid = uid;
_usm = usm;
+ _ctr = ctr;
}
public void sendAborted(int reason, String desc) throws
NotConnectedException {
- _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc));
+ _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc),
_ctr);
}
public byte[] receive() throws RetrievalException {
@@ -70,7 +73,7 @@
MessageFilter mfPacketTransmit =
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
_uid).setSource(_sender);
MessageFilter mfAllSent =
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID,
_uid).setSource(_sender);
MessageFilter mfSendAborted =
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID,
_uid).setSource(_sender);
- m1 =
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)));
+ m1 =
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)), _ctr);
if(!_sender.isConnected()) throw new DisconnectedException();
} catch (DisconnectedException e1) {
Logger.normal(this, "Disconnected during receive: "+_uid+"
from "+_sender);
@@ -110,7 +113,7 @@
Logger.minor(this, "Missing: "+missing.size());
if (missing.size() > 0) {
Message mn =
DMT.createMissingPacketNotification(_uid, missing);
- _usm.send(_sender, mn);
+ _usm.send(_sender, mn, _ctr);
consecutiveMissingPacketReports++;
if (missing.size() > 50) {
Logger.normal(this, "Excessive
packet loss : "+mn);
@@ -131,14 +134,14 @@
}
}
Message mn =
DMT.createMissingPacketNotification(_uid, missing);
- _usm.send(_sender, mn);
+ _usm.send(_sender, mn, _ctr);
consecutiveMissingPacketReports++;
if (missing.size() > 50) {
Logger.normal(this, "Sending large
missingPacketNotification due to packet receiver timeout after
"+RECEIPT_TIMEOUT+"ms");
}
}
}
- _usm.send(_sender, DMT.createAllReceived(_uid));
+ _usm.send(_sender, DMT.createAllReceived(_uid), _ctr);
return _prb.getBlock();
} catch(NotConnectedException e) {
throw new
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -29,6 +29,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.UdpSocketManager;
+import freenet.node.ByteCounter;
import freenet.node.FNPPacketMangler;
import freenet.node.PeerNode;
import freenet.support.BitArray;
@@ -55,14 +56,16 @@
final PacketThrottle throttle;
long timeAllSent = -1;
final DoubleTokenBucket _masterThrottle;
+ final ByteCounter _ctr;
final int PACKET_SIZE;
- public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle) {
+ public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
_usm = usm;
_destination = destination;
_uid = uid;
_prb = source;
- this._masterThrottle = masterThrottle;
+ _ctr = ctr;
+ _masterThrottle = masterThrottle;
PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize,
_prb._packets)
+ FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
try {
@@ -113,13 +116,13 @@
delay(startCycleTime);
_sentPackets.setBit(packetNo,
true);
try {
-
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE);
+
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE, _ctr);
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
sentSinceLastPing = 0;
//_usm.send(BlockTransmitter.this._destination, DMT.createPing());
-
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0);
+
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0, _ctr);
}
} catch (NotConnectedException
e) {
Logger.normal(this,
"Terminating send: "+e);
@@ -172,7 +175,7 @@
}
public void sendAborted(int reason, String desc) throws
NotConnectedException {
- _usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc));
+ _usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc), _ctr);
}
public boolean send() {
@@ -194,7 +197,7 @@
public void receiveAborted(int reason, String
description) {
try {
-
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason,
description), null, 0);
+
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason,
description), null, 0, _ctr);
} catch (NotConnectedException e) {
Logger.minor(this, "Receive aborted and receiver is not
connected");
}
@@ -215,7 +218,7 @@
MessageFilter mfMissingPacketNotification =
MessageFilter.create().setType(DMT.missingPacketNotification).setField(DMT.UID,
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
MessageFilter mfAllReceived =
MessageFilter.create().setType(DMT.allReceived).setField(DMT.UID,
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
MessageFilter mfSendAborted =
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID,
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
- msg =
_usm.waitFor(mfMissingPacketNotification.or(mfAllReceived.or(mfSendAborted)));
+ msg =
_usm.waitFor(mfMissingPacketNotification.or(mfAllReceived.or(mfSendAborted)),
_ctr);
Logger.minor(this, "Got "+msg);
} catch (DisconnectedException e) {
// Ignore, see below
Added: trunk/freenet/src/freenet/node/ByteCounter.java
===================================================================
--- trunk/freenet/src/freenet/node/ByteCounter.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/ByteCounter.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -0,0 +1,12 @@
+package freenet.node;
+
+/**
+ * Interface for something which counts bytes.
+ */
+public interface ByteCounter {
+
+ public void sentBytes(int x);
+
+ public void receivedBytes(int x);
+
+}
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -16,7 +16,7 @@
import freenet.keys.NodeCHK;
import freenet.support.Logger;
-public final class CHKInsertSender implements Runnable, AnyInsertSender {
+public final class CHKInsertSender implements Runnable, AnyInsertSender,
ByteCounter {
private static class Sender implements Runnable {
@@ -66,7 +66,7 @@
AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
this.pn = pn;
- bt = new BlockTransmitter(node.usm, pn, uid, prb,
node.outputThrottle);
+ bt = new BlockTransmitter(node.usm, pn, uid, prb,
node.outputThrottle, CHKInsertSender.this);
}
void start() {
@@ -114,8 +114,6 @@
}
}
-
-
CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
PeerNode source, Node node, PartiallyReceivedBlock prb, boolean
fromStore, double closestLocation) {
this.myKey = myKey;
@@ -263,7 +261,7 @@
// Send to next node
try {
- next.send(req);
+ next.send(req, this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected to "+next);
continue;
@@ -282,7 +280,7 @@
while (true) {
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from
" + next
+ " while waiting for
Accepted");
@@ -363,7 +361,7 @@
Logger.minor(this, "Sending DataInsert");
if(receiveFailed) return;
try {
- next.send(dataInsert);
+ next.send(dataInsert, this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected sending
DataInsert: "+next+" for "+uid);
continue;
@@ -385,7 +383,7 @@
return;
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from
" + next
+ " while waiting for
InsertReply on " + this);
@@ -549,6 +547,15 @@
notifyAll();
}
+ if(code != TIMED_OUT && code != GENERATED_REJECTED_OVERLOAD && code !=
INTERNAL_ERROR
+ && code != ROUTE_REALLY_NOT_FOUND) {
+ Logger.minor(this, "CHK insert cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
+ (source == null ? node.localChkInsertBytesSentAverage :
node.remoteChkInsertBytesSentAverage)
+ .report(getTotalSentBytes());
+ (source == null ? node.localChkInsertBytesReceivedAverage :
node.remoteChkInsertBytesReceivedAverage)
+ .report(getTotalReceivedBytes());
+ }
+
Logger.minor(this, "Returning from finish()");
}
@@ -695,7 +702,7 @@
} else {
Message m;
try {
- m = node.usm.waitFor(mf);
+ m = node.usm.waitFor(mf,
CHKInsertSender.this);
} catch (DisconnectedException e) {
// Which one? I have no idea.
// Go around the loop again.
@@ -819,4 +826,33 @@
public long getUID() {
return uid;
}
+
+ private final Object totalBytesSync = new Object();
+ private int totalBytesSent;
+
+ public void sentBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesSent += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesSent;
+ }
+ }
+
+ private int totalBytesReceived;
+
+ public void receivedBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesReceived += x;
+ }
+ }
+
+ public int getTotalReceivedBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesReceived;
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -630,7 +630,7 @@
node.random.acceptEntropyBytes(myPacketDataSource, packetHash, 0,
md.getDigestLength(), 0.5);
// Lots more to do yet!
- processDecryptedData(plaintext, seqNumber, tracker);
+ processDecryptedData(plaintext, seqNumber, tracker, length -
plaintext.length);
return true;
}
@@ -640,7 +640,7 @@
* @param seqNumber The detected sequence number of the packet.
* @param tracker The KeyTracker responsible for the key used to encrypt
the packet.
*/
- private void processDecryptedData(byte[] decrypted, int seqNumber,
KeyTracker tracker) {
+ private void processDecryptedData(byte[] decrypted, int seqNumber,
KeyTracker tracker, int overhead) {
/**
* Decoded format:
* 1 byte - version number (0)
@@ -777,6 +777,8 @@
int messages = decrypted[ptr++] & 0xff;
+ overhead += ptr;
+
for(int i=0;i<messages;i++) {
if(ptr+1 >= decrypted.length) {
Logger.error(this, "Packet not long enough at byte "+ptr+" on
"+tracker);
@@ -788,7 +790,7 @@
return;
}
Logger.minor(this, "Message "+i+" length "+length+", hash code:
"+Fields.hashCode(decrypted, ptr, length));
- Message m = usm.decodePacket(decrypted, ptr, length, tracker.pn);
+ Message m = usm.decodePacket(decrypted, ptr, length, tracker.pn, 1
+ (overhead / messages));
ptr+=length;
if(m != null) {
//Logger.minor(this, "Dispatching packet: "+m);
@@ -805,6 +807,7 @@
Logger.minor(this, "processOutgoingOrRequeue "+messages.length+"
messages for "+pn+" ("+neverWaitForPacketNumber+")");
byte[][] messageData = new byte[messages.length][];
int[] alreadyReported = new int[messages.length];
+ MessageItem[] newMsgs = new MessageItem[messages.length];
int length = 1;
int callbacksCount = 0;
int x = 0;
@@ -821,35 +824,45 @@
}
int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
this.processOutgoingPreformatted(buf, 0, buf.length,
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
+ if(mi.ctrCallback != null)
+ mi.ctrCallback.sentBytes(buf.length +
HEADERS_LENGTH_ONE_MESSAGE);
} catch (NotConnectedException e) {
Logger.minor(this, "Caught "+e+" while sending messages,
requeueing");
// Requeue
- if(!dontRequeue)
+ if(!dontRequeue) {
+ pn.requeueMessageItems(messages, 0, x, false,
"NotConnectedException");
pn.requeueMessageItems(messages, i, messages.length-i,
false, "NotConnectedException");
+ }
return;
} catch (WouldBlockException e) {
Logger.minor(this, "Caught "+e+" while sending messages,
requeueing", e);
// Requeue
- if(!dontRequeue)
- pn.requeueMessageItems(messages, i, messages.length-i,
false, "WouldBlockException");
+ if(!dontRequeue) {
+ pn.requeueMessageItems(messages, 0, x, false,
"NotConnectedException");
+ pn.requeueMessageItems(messages, i, messages.length-i,
false, "NotConnectedException");
+ }
return;
} catch (KeyChangedException e) {
Logger.minor(this, "Caught "+e+" while sending messages,
requeueing");
// Requeue
- if(!dontRequeue)
- pn.requeueMessageItems(messages, i, messages.length-i,
false, "KeyChangedException");
+ if(!dontRequeue) {
+ pn.requeueMessageItems(messages, 0, x, false,
"NotConnectedException");
+ pn.requeueMessageItems(messages, i, messages.length-i,
false, "NotConnectedException");
+ }
return;
} catch (Throwable e) {
Logger.error(this, "Caught "+e+" while sending messages,
requeueing", e);
// Requeue
- if(!dontRequeue)
- pn.requeueMessageItems(messages, i, messages.length-i,
false, "Throwable");
+ if(!dontRequeue) {
+ pn.requeueMessageItems(messages, 0, x, false,
"NotConnectedException");
+ pn.requeueMessageItems(messages, i, messages.length-i,
false, "NotConnectedException");
+ }
return;
-
}
} else {
byte[] data = mi.getData(this, pn);
messageData[x] = data;
+ newMsgs[x] = mi;
alreadyReported[x] = mi.alreadyReportedBytes;
x++;
if(mi.cb != null) callbacksCount += mi.cb.length;
@@ -876,9 +889,17 @@
if(x != callbacksCount) throw new IllegalStateException();
if(length < node.usm.getMaxPacketSize() &&
- messages.length < 256) {
+ messageData.length < 256) {
try {
innerProcessOutgoing(messageData, 0, messageData.length,
length, pn, neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+ for(int i=0;i<messageData.length;i++) {
+ MessageItem mi = newMsgs[i];
+ if(mi.ctrCallback != null) {
+ mi.ctrCallback.sentBytes(messageData[i].length
+
+ 1 + (HEADERS_LENGTH_MINIMUM /
messageData.length));
+ // FIXME rounding issues
+ }
+ }
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending messages,
requeueing");
// Requeue
@@ -920,6 +941,14 @@
if(lastIndex != i) {
try {
innerProcessOutgoing(messageData, lastIndex,
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks,
alreadyReportedBytes);
+ for(int j=lastIndex;j<i;j++) {
+ MessageItem mi = newMsgs[j];
+ if(mi.ctrCallback != null) {
+
mi.ctrCallback.sentBytes(messageData[j].length +
+ 1 +
(HEADERS_LENGTH_MINIMUM / (i-lastIndex)));
+ // FIXME rounding issues
+ }
+ }
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending
messages, requeueing remaining messages");
// Requeue
@@ -941,7 +970,7 @@
}
}
lastIndex = i;
- if(i != messages.length)
+ if(i != messageData.length)
length = 1 + (messageData[i].length + 2);
count = 0;
} else {
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -80,7 +80,7 @@
// Send Accepted
Message accepted = DMT.createFNPAccepted(uid);
try {
- source.send(accepted);
+ source.send(accepted, null);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection to source");
return;
@@ -93,7 +93,7 @@
Message msg;
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, null);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected while waiting for DataInsert on
"+uid);
return;
@@ -106,11 +106,11 @@
if(source.isConnected() && startTime >
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null, 0);
+ source.sendAsync(tooSlow, null, 0, null);
Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
- source.sendAsync(m, null, 0);
+ source.sendAsync(m, null, 0, null);
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- br = new BlockReceiver(node.usm, source, uid, prb);
+ br = new BlockReceiver(node.usm, source, uid, prb,
null);
prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
return;
@@ -132,7 +132,7 @@
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
if(htl > 0)
sender = node.makeInsertSender(key, htl, uid, source, headers,
prb, false, closestLoc, true);
- br = new BlockReceiver(node.usm, source, uid, prb);
+ br = new BlockReceiver(node.usm, source, uid, prb, null);
// Receive the data, off thread
@@ -146,7 +146,7 @@
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
// Ignore
}
@@ -195,7 +195,7 @@
// Forward it
Message m = DMT.createFNPRejectedOverload(uid, false);
try {
- source.send(m);
+ source.send(m, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -225,7 +225,7 @@
status == CHKInsertSender.INTERNAL_ERROR) {
msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -241,7 +241,7 @@
if(status == CHKInsertSender.ROUTE_NOT_FOUND || status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -255,7 +255,7 @@
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -269,7 +269,7 @@
Logger.error(this, "Unknown status code:
"+sender.getStatusString());
msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
// Ignore
}
@@ -314,7 +314,7 @@
boolean failed = sender.anyTransfersFailed();
Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
try {
- source.sendAsync(m, null, 0);
+ source.sendAsync(m, null, 0, null);
Logger.minor(this, "Sent completion: "+failed+" for
"+this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected: "+source+" for
"+this);
@@ -344,7 +344,7 @@
}
if(toSend != null) {
try {
- source.sendAsync(toSend, null, 0);
+ source.sendAsync(toSend, null, 0, null);
} catch (NotConnectedException e) {
// :(
Logger.minor(this, "Lost connection in "+this+" when sending
FNPDataInsertRejected");
@@ -368,7 +368,7 @@
runThread.interrupt();
Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException ex) {
Logger.error(this, "Can't send "+msg+" to "+source+":
"+ex);
}
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -842,7 +842,7 @@
AsyncMessageCallback[] callbacks = element.callbacks;
// Ignore packet#
Logger.minor(this, "Queueing resend of what was once
"+element.packetNumber);
- messages[i] = new MessageItem(buf, callbacks, true, 0);
+ messages[i] = new MessageItem(buf, callbacks, true, 0, null);
}
pn.requeueMessageItems(messages, 0, messages.length, true);
pn.node.ps.queuedResendPacket();
Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -233,11 +233,11 @@
MessageFilter filter =
MessageFilter.create().setType(DMT.FNPSwapCommit).setField(DMT.UID,
uid).setTimeout(TIMEOUT).setSource(pn);
- node.usm.send(pn, m);
+ node.usm.send(pn, m, null);
Message commit;
try {
- commit = node.usm.waitFor(filter);
+ commit = node.usm.waitFor(filter, null);
} catch (DisconnectedException e) {
Logger.minor(this, "Disconnected from "+pn+" while waiting for
SwapCommit");
return;
@@ -295,7 +295,7 @@
Message confirm = DMT.createFNPSwapComplete(uid, myValue);
- node.usm.send(pn, confirm);
+ node.usm.send(pn, confirm, null);
if(shouldSwap(myLoc, friendLocs, hisLoc, hisFriendLocs, random ^
hisRandom)) {
timeLastSuccessfullySwapped = System.currentTimeMillis();
@@ -374,12 +374,12 @@
// 60 seconds
filter.setTimeout(TIMEOUT);
- node.usm.send(pn, m);
+ node.usm.send(pn, m, null);
Logger.minor(this, "Waiting for SwapReply/SwapRejected on
"+uid);
Message reply;
try {
- reply = node.usm.waitFor(filter);
+ reply = node.usm.waitFor(filter, null);
} catch (DisconnectedException e) {
Logger.minor(this, "Disconnected while waiting for
SwapReply/SwapRejected for "+uid);
return;
@@ -409,12 +409,12 @@
MessageFilter filter3 =
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPSwapComplete).setTimeout(TIMEOUT).setSource(pn);
filter = filter1.or(filter3);
- node.usm.send(pn, confirm);
+ node.usm.send(pn, confirm, null);
Logger.minor(this, "Waiting for SwapComplete: uid = "+uid);
try {
- reply = node.usm.waitFor(filter);
+ reply = node.usm.waitFor(filter, null);
} catch (DisconnectedException e) {
Logger.minor(this, "Disconnected waiting for SwapComplete
on "+uid);
return;
@@ -675,7 +675,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to "+pn+" rejecting
SwapRequest");
}
@@ -687,7 +687,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting SwapRequest from
"+pn);
}
@@ -705,7 +705,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection rejecting SwapRequest
(locked) from "+pn);
}
@@ -740,7 +740,7 @@
Logger.minor(this, "Late reject "+uid);
Message reject = DMT.createFNPSwapRejected(uid);
try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e1) {
Logger.normal(this, "Late reject but disconnected from
sender: "+pn);
}
@@ -754,7 +754,7 @@
// Forward the request.
// Note that we MUST NOT send this blocking as we are on
the
// receiver thread.
- randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0);
+ randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0, null);
} catch (NotConnectedException e) {
// Try a different node
continue;
@@ -801,7 +801,7 @@
m.set(DMT.UID, item.incomingID);
Logger.minor(this, "Forwarding SwapReply "+uid+" from
"+m.getSource()+" to "+item.requestSender);
try {
- item.requestSender.sendAsync(m, null, 0);
+ item.requestSender.sendAsync(m, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapReply "+uid+"
to "+item.requestSender);
}
@@ -833,7 +833,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null, 0);
+ item.requestSender.sendAsync(m, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapRejected
"+uid+" to "+item.requestSender);
}
@@ -860,7 +860,7 @@
// Sending onwards - use outgoing ID
m.set(DMT.UID, item.outgoingID);
try {
- item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender), 0);
+ item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender), 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding SwapCommit "+uid+"
to "+item.routedTo);
}
@@ -897,7 +897,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null, 0);
+ item.requestSender.sendAsync(m, null, 0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Lost connection forwarding SwapComplete
"+uid+" to "+item.requestSender);
}
@@ -943,7 +943,7 @@
Message msg = DMT.createFNPSwapRejected(item.incomingID);
Logger.minor(this, "Rejecting in lostOrRestartedNode:
"+item.incomingID+ " from "+item.requestSender);
try {
- item.requestSender.sendAsync(msg, null, 0);
+ item.requestSender.sendAsync(msg, null, 0, null);
} catch (NotConnectedException e1) {
Logger.normal(this, "Both sender and receiver disconnected for
"+item);
}
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -14,22 +14,25 @@
* for sending as a single packet.
*/
final boolean formatted;
+ final ByteCounter ctrCallback;
- public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int
alreadyReportedBytes) {
+ public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int
alreadyReportedBytes, ByteCounter ctr) {
this.alreadyReportedBytes = alreadyReportedBytes;
this.msg = msg2;
this.cb = cb2;
buf = null;
formatted = false;
+ this.ctrCallback = ctr;
this.submitted = System.currentTimeMillis();
}
- public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes) {
+ public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes, ByteCounter ctr) {
this.alreadyReportedBytes = alreadyReportedBytes;
this.cb = cb2;
this.msg = null;
this.buf = data;
this.formatted = formatted;
+ this.ctrCallback = ctr;
this.submitted = System.currentTimeMillis();
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-07-08 01:05:24 UTC (rev
9499)
+++ trunk/freenet/src/freenet/node/Node.java 2006-07-08 01:42:43 UTC (rev
9500)
@@ -120,6 +120,7 @@
import freenet.support.PaddedEphemerallyEncryptedBucketFactory;
import freenet.support.SimpleFieldSet;
import freenet.support.SimpleReadOnlyArrayBucket;
+import freenet.support.TokenBucket;
import freenet.support.io.FilenameGenerator;
import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.TempBucketFactory;
@@ -562,6 +563,9 @@
final TestnetHandler testnetHandler;
final StaticSwapRequestInterval swapInterval;
public final DoubleTokenBucket outputThrottle;
+ final TokenBucket requestOutputThrottle;
+ final TokenBucket requestInputThrottle;
+ private boolean inputLimitDefault = false;
static short MAX_HTL = 10;
static final int EXIT_STORE_FILE_NOT_FOUND = 1;
static final int EXIT_STORE_IOEXCEPTION = 2;
@@ -614,6 +618,22 @@
final TimeDecayingRunningAverage throttledPacketSendAverage;
/** Must be included as a hidden field in order for any dangerous HTTP
operation to complete successfully. */
public final String formPassword;
+ final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
+ final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
+ final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
+ final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
+ final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
+ final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
+ final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
File downloadDir;
public final ClientRequestScheduler chkFetchScheduler;
@@ -1208,15 +1228,44 @@
//return
BlockTransmitter.getHardBandwidthLimit();
return (int) ((1000L * 1000L *
1000L) / outputThrottle.getNanosPerTick());
}
- public void set(int val) throws
InvalidConfigValueException {
- if(val <= 0) throw new
InvalidConfigValueException("Bandwidth limit must be positive");
-
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / val, val,
(val * 4) / 5);
+ public void set(int obwLimit) throws
InvalidConfigValueException {
+ if(obwLimit <= 0) throw new
InvalidConfigValueException("Bandwidth limit must be positive");
+
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / obwLimit,
obwLimit/2, (obwLimit * 2) / 5);
+
requestOutputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) / obwLimit,
Math.max(obwLimit*60, 32768*20));
+ if(inputLimitDefault) {
+ int ibwLimit = obwLimit
* 4;
+
requestInputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) / ibwLimit,
Math.max(ibwLimit*60, 32768*20));
+ }
}
});
int obwLimit = nodeConfig.getInt("outputBandwidthLimit");
- this.outputThrottle = new DoubleTokenBucket(obwLimit/2,
(1000L*1000L*1000L) / obwLimit, obwLimit, (obwLimit * 4) / 5);
+ outputThrottle = new DoubleTokenBucket(obwLimit/2,
(1000L*1000L*1000L) / obwLimit, obwLimit, (obwLimit * 2) / 5);
+ requestOutputThrottle =
+ new TokenBucket(Math.max(obwLimit*60, 32768*20),
(1000L*1000L*1000L) / obwLimit, 0);
+ nodeConfig.register("inputBandwidthLimit", "-1", sortOrder++,
false,
+ "Input bandwidth limit (bytes per second)",
"Input bandwidth limit (bytes/sec); the node will try not to exceed this; -1 =
4x set outputBandwidthLimit",
+ new IntCallback() {
+ public int get() {
+ if(inputLimitDefault) return -1;
+ return (int) ((1000L * 1000L *
1000L) / requestInputThrottle.getNanosPerTick());
+ }
+ public void set(int ibwLimit) throws
InvalidConfigValueException {
+ if(ibwLimit == -1) {
+ inputLimitDefault =
true;
+ ibwLimit = (int)
((1000L * 1000L * 1000L) / outputThrottle.getNanosPerTick()) * 4;
+ }
+ if(ibwLimit <= 0) throw new
InvalidConfigValueException("Bandwidth limit must be positive or -1");
+
requestInputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) / ibwLimit,
Math.max(ibwLimit*60, 32768*20));
+ }
+ });
+
+ int ibwLimit = nodeConfig.getInt("inputBandwidthLimit");
+ requestInputThrottle =
+ new TokenBucket(Math.max(ibwLimit*60, 32768*20),
(1000L*1000L*1000L) / ibwLimit, 0);
+
+
// FIXME add an averaging/long-term/soft bandwidth limit. (bug
76)
// SwapRequestInterval
@@ -1522,35 +1571,54 @@
// Select the request scheduler
+ localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ localChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+ localSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+ localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+ localSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+ localChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ localSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+
+ remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+
// FIXME make all the below arbitrary constants configurable!
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request");
- chkRequestStarter = new RequestStarter(this,
chkRequestThrottle, "CHK Request starter ("+portNumber+")");
+ chkRequestStarter = new RequestStarter(this,
chkRequestThrottle, "CHK Request starter ("+portNumber+")",
requestOutputThrottle, requestInputThrottle, localChkFetchBytesSentAverage,
localChkFetchBytesReceivedAverage);
chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, this);
chkRequestStarter.setScheduler(chkFetchScheduler);
chkRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
chkInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "CHK Insert");
- chkInsertStarter = new RequestStarter(this, chkInsertThrottle,
"CHK Insert starter ("+portNumber+")");
+ chkInsertStarter = new RequestStarter(this, chkInsertThrottle,
"CHK Insert starter ("+portNumber+")", requestOutputThrottle,
requestInputThrottle, localChkInsertBytesSentAverage,
localChkInsertBytesReceivedAverage);
chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, this);
chkInsertStarter.setScheduler(chkPutScheduler);
chkInsertStarter.start();
sskRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "SSK Request");
- sskRequestStarter = new RequestStarter(this,
sskRequestThrottle, "SSK Request starter ("+portNumber+")");
+ sskRequestStarter = new RequestStarter(this,
sskRequestThrottle, "SSK Request starter ("+portNumber+")",
requestOutputThrottle, requestInputThrottle, localSskFetchBytesSentAverage,
localSskFetchBytesReceivedAverage);
sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, this);
sskRequestStarter.setScheduler(sskFetchScheduler);
sskRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
sskInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "SSK Insert");
- sskInsertStarter = new RequestStarter(this, sskInsertThrottle,
"SSK Insert starter ("+portNumber+")");
+ sskInsertStarter = new RequestStarter(this, sskInsertThrottle,
"SSK Insert starter ("+portNumber+")", requestOutputThrottle,
requestInputThrottle, localSskInsertBytesSentAverage,
localSskFetchBytesReceivedAverage);
sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, this);
sskInsertStarter.setScheduler(sskPutScheduler);
sskInsertStarter.start();
+
nodeConfig.finishedInitialization();
writeNodeFile();
@@ -2174,11 +2242,29 @@
+ FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
/* return reject reason as string if should reject, otherwise return null
*/
- public synchronized String shouldRejectRequest(boolean canAcceptAnyway)
{
+ public synchronized String shouldRejectRequest(boolean canAcceptAnyway,
boolean isInsert, boolean isSSK) {
long now = System.currentTimeMillis();
+
+ dumpByteCostAverages();
double bwlimitDelayTime =
throttledPacketSendAverage.currentValue();
+ // Do we have the bandwidth?
+
+ double expected =
+ (isInsert ? (isSSK ?
this.remoteSskInsertBytesSentAverage : this.remoteChkInsertBytesSentAverage)
+ : (isSSK ?
this.remoteSskFetchBytesSentAverage :
this.remoteChkFetchBytesSentAverage)).currentValue();
+ int e = (int)Math.max(expected, 0);
+ if(!requestOutputThrottle.instantGrab(e)) return "Insufficient
output bandwidth";
+ expected =
+ (isInsert ? (isSSK ?
this.remoteSskInsertBytesReceivedAverage :
this.remoteChkInsertBytesReceivedAverage)
+ : (isSSK ?
this.remoteSskFetchBytesReceivedAverage :
this.remoteChkFetchBytesReceivedAverage)).currentValue();
+ e = (int)Math.max(expected, 0);
+ if(!requestInputThrottle.instantGrab(e)) return "Insufficient
input bandwidth";
+
+
+ // If no recent reports, no packets have been sent; correct the
average downwards.
+
if(throttledPacketSendAverage.lastReportTime() <
System.currentTimeMillis() - 5000) {
outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
@@ -2229,6 +2315,19 @@
return null;
}
+ private void dumpByteCostAverages() {
+ Logger.minor(this, "Byte cost averages: REMOTE:"+
+ " CHK insert
"+remoteChkInsertBytesSentAverage.currentValue()+"/"+remoteChkInsertBytesReceivedAverage.currentValue()+
+ " SSK insert
"+remoteSskInsertBytesSentAverage.currentValue()+"/"+remoteSskInsertBytesReceivedAverage.currentValue()+
+ " CHK fetch
"+remoteChkFetchBytesSentAverage.currentValue()+"/"+remoteChkFetchBytesReceivedAverage.currentValue()+
+ " SSK fetch
"+remoteSskFetchBytesSentAverage.currentValue()+"/"+remoteSskFetchBytesReceivedAverage.currentValue());
+ Logger.minor(this, "Byte cost averages: LOCAL"+
+ " CHK insert
"+localChkInsertBytesSentAverage.currentValue()+"/"+localChkInsertBytesReceivedAverage.currentValue()+
+ " SSK insert
"+localSskInsertBytesSentAverage.currentValue()+"/"+localSskInsertBytesReceivedAverage.currentValue()+
+ " CHK fetch
"+localChkFetchBytesSentAverage.currentValue()+"/"+localChkFetchBytesReceivedAverage.currentValue()+
+ " SSK fetch
"+localSskFetchBytesSentAverage.currentValue()+"/"+localSskFetchBytesReceivedAverage.currentValue());
+ }
+
public SimpleFieldSet exportPrivateFieldSet() {
SimpleFieldSet fs = exportPublicFieldSet();
fs.put("dsaPrivKey", myPrivKey.asFieldSet());
@@ -2387,7 +2486,7 @@
try {
//MessageFilter mf2 =
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPRoutedRejected).setTimeout(5000);
// Ignore Rejected - let it be retried on other peers
- m = usm.waitFor(mf1/*.or(mf2)*/);
+ m = usm.waitFor(mf1/*.or(mf2)*/, null);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected in waiting for pong");
return -1;
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -45,7 +45,7 @@
// Send an FNPPong
Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
try {
- ((PeerNode)m.getSource()).sendAsync(reply, null, 0); //
nothing we can do if can't contact source
+ ((PeerNode)m.getSource()).sendAsync(reply, null, 0, null); //
nothing we can do if can't contact source
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection replying to "+m);
}
@@ -83,7 +83,7 @@
long id = m.getLong(DMT.PING_SEQNO);
Message msg = DMT.createFNPLinkPong(id);
try {
- source.sendAsync(msg, null, 0);
+ source.sendAsync(msg, null, 0, null);
} catch (NotConnectedException e) {
// Ignore
}
@@ -113,19 +113,19 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting data request (loop, finished):
"+e);
}
return true;
}
- String rejectReason = node.shouldRejectRequest(!isSSK);
+ String rejectReason = node.shouldRejectRequest(!isSSK, false, isSSK);
if(rejectReason != null) {
// can accept 1 CHK request every so often, but not with SSKs
because they aren't throttled so won't sort out bwlimitDelayTime, which was the
whole reason for accepting them when overloaded...
Logger.normal(this, "Rejecting request from
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id, true);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data request from
"+m.getSource().getPeer()+": "+e);
}
@@ -136,7 +136,7 @@
Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already
running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -158,19 +158,19 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
return true;
}
// SSKs don't fix bwlimitDelayTime so shouldn't be accepted when
overloaded.
- String rejectReason = node.shouldRejectRequest(!isSSK);
+ String rejectReason = node.shouldRejectRequest(!isSSK, true, isSSK);
if(rejectReason != null) {
Logger.normal(this, "Rejecting insert from
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id, true);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null,
0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -181,7 +181,7 @@
Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already
running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+ ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request from
"+m.getSource().getPeer()+": "+e);
}
@@ -269,7 +269,7 @@
ctx = (RoutedContext)routedContexts.get(lid);
if(ctx != null) {
try {
-
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id,
(short)(htl-1)), null, 0);
+
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id,
(short)(htl-1)), null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting "+m);
}
@@ -289,7 +289,7 @@
} else if(htl == 0) {
Message reject = DMT.createFNPRoutedRejected(id, (short)0);
if(pn != null) try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection rejecting "+m);
}
@@ -311,7 +311,7 @@
PeerNode pn = ctx.source;
if(pn == null) return false;
try {
- pn.sendAsync(m, null, 0);
+ pn.sendAsync(m, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
}
@@ -330,7 +330,7 @@
Logger.minor(this, "Forwarding "+m.getSpec()+" to
"+next.getPeer().getPort());
ctx.addSent(next);
try {
- next.sendAsync(m, null, 0);
+ next.sendAsync(m, null, 0, null);
} catch (NotConnectedException e) {
continue;
}
@@ -339,7 +339,7 @@
// Reached a dead end...
Message reject = DMT.createFNPRoutedRejected(id, htl);
if(pn != null) try {
- pn.sendAsync(reject, null, 0);
+ pn.sendAsync(reject, null, 0, null);
} catch (NotConnectedException e) {
Logger.error(this, "Cannot send reject message back to
source "+pn);
return true;
@@ -374,7 +374,7 @@
int x = m.getInt(DMT.COUNTER);
Message reply = DMT.createFNPRoutedPong(id, x);
try {
- src.sendAsync(reply, null, 0);
+ src.sendAsync(reply, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection replying to "+m+" in
dispatchRoutedMessage");
}
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -250,7 +250,7 @@
// Force packet to have a sequence number.
Message m = DMT.createFNPVoid();
pn.addToLocalNodeSentMessagesToStatistic(m);
- node.packetMangler.processOutgoingOrRequeue(new
MessageItem[] { new MessageItem(m, null, 0) }, pn, true, true);
+ node.packetMangler.processOutgoingOrRequeue(new
MessageItem[] { new MessageItem(m, null, 0, null) }, pn, true, true);
}
} else {
// Not connected
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -328,7 +328,7 @@
PeerNode[] peers = connectedPeers; // avoid synchronization
for(int i=0;i<peers.length;i++) {
if(peers[i].isConnected()) try {
- peers[i].sendAsync(msg, null, 0);
+ peers[i].sendAsync(msg, null, 0, null);
} catch (NotConnectedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -692,13 +692,13 @@
* relating to this packet (normally set when we have delayed a packet in
order to
* throttle it).
*/
- public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes) throws NotConnectedException {
+ public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException {
Logger.minor(this, "Sending async: "+msg+" : "+cb+" on "+this);
if(!isConnected) throw new NotConnectedException();
- MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[] {cb}, alreadyReportedBytes);
- synchronized(routingBackoffSync) {
- reportBackoffStatus(System.currentTimeMillis());
- }
+ MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[] {cb}, alreadyReportedBytes, ctr);
+ synchronized(routingBackoffSync) {
+ reportBackoffStatus(System.currentTimeMillis());
+ }
synchronized(messagesToSendNow) {
messagesToSendNow.addLast(item);
}
@@ -915,10 +915,10 @@
*/
public boolean ping(int pingID) throws NotConnectedException {
Message ping = DMT.createFNPPing(pingID);
- node.usm.send(this, ping);
+ node.usm.send(this, ping, null);
Message msg;
try {
- msg =
node.usm.waitFor(MessageFilter.create().setTimeout(2000).setType(DMT.FNPPong).setField(DMT.PING_SEQNO,
pingID));
+ msg =
node.usm.waitFor(MessageFilter.create().setTimeout(2000).setType(DMT.FNPPong).setField(DMT.PING_SEQNO,
pingID), null);
} catch (DisconnectedException e) {
throw new NotConnectedException("Disconnected while waiting for
pong");
}
@@ -949,14 +949,14 @@
/**
* Send a message, right now, on this thread, to this node.
*/
- public void send(Message req) throws NotConnectedException {
+ public void send(Message req, ByteCounter ctr) throws
NotConnectedException {
synchronized (this) {
if(!isConnected()) {
Logger.error(this, "Tried to send "+req+" but not connected to
"+this, new Exception("debug"));
return;
}
}
- node.usm.send(this, req);
+ node.usm.send(this, req, ctr);
}
/**
@@ -1207,8 +1207,8 @@
Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);
try {
- sendAsync(locMsg, null, 0);
- sendAsync(ipMsg, null, 0);
+ sendAsync(locMsg, null, 0, null);
+ sendAsync(ipMsg, null, 0, null);
} catch (NotConnectedException e) {
Logger.error(this, "Completed handshake with "+getPeer()+" but
disconnected!!!", new Exception("error"));
}
@@ -1601,7 +1601,7 @@
Logger.error(this, "No tracker to resend packet
"+item.packetNumber+" on");
continue;
}
- MessageItem mi = new MessageItem(item.buf, item.callbacks, true,
0);
+ MessageItem mi = new MessageItem(item.buf, item.callbacks, true,
0, null);
requeueMessageItems(new MessageItem[] {mi}, 0, 1, true);
}
}
@@ -1751,7 +1751,7 @@
}
Message msg = DMT.createFNPLinkPing(pingNo);
try {
- sendAsync(msg, null, 0);
+ sendAsync(msg, null, 0, null);
} catch (NotConnectedException e) {
synchronized(pingSync) {
pingsSentTimes.removeKey(lPingNo);
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -56,26 +56,26 @@
htl = source.decrementHTL(htl);
Message accepted = DMT.createFNPAccepted(uid);
- source.send(accepted);
+ source.send(accepted, null);
Object o = node.makeRequestSender(key, htl, uid, source, closestLoc,
false, true, false);
if(o instanceof KeyBlock) {
KeyBlock block = (KeyBlock) o;
Message df = createDataFound(block);
- source.send(df);
+ source.send(df, null);
if(key instanceof NodeSSK) {
if(needsPubKey) {
DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
Logger.minor(this, "Sending PK: "+key+"
"+key.writeAsField());
- source.send(pk);
+ source.send(pk, null);
}
}
if(block instanceof CHKBlock) {
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle);
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, null);
bt.send();
}
return;
@@ -84,7 +84,7 @@
if(rs == null) { // ran out of htl?
Message dnf = DMT.createFNPDataNotFound(uid);
- source.send(dnf);
+ source.send(dnf, null);
return;
}
@@ -95,16 +95,16 @@
if(rs.waitUntilStatusChange()) {
// Forward RejectedOverload
Message msg = DMT.createFNPRejectedOverload(uid, false);
- source.sendAsync(msg, null, 0);
+ source.sendAsync(msg, null, 0, null);
}
if(rs.transferStarted()) {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
- source.send(df);
+ source.send(df, null);
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle);
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, null);
bt.send(); // either fails or succeeds; other side will see, we
don't care
return;
}
@@ -116,7 +116,7 @@
continue;
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendAsync(dnf, null, 0);
+ source.sendAsync(dnf, null, 0, null);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -124,20 +124,20 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0);
+ source.sendAsync(reject, null, 0, null);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.sendAsync(rnf, null, 0);
+ source.sendAsync(rnf, null, 0, null);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
Message df = DMT.createFNPSSKDataFound(uid,
rs.getHeaders(), rs.getSSKData());
- source.send(df);
+ source.send(df, null);
if(needsPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
- source.send(df);
+ source.send(df, null);
}
} else if(!rs.transferStarted()) {
Logger.error(this, "Status is SUCCESS but we
never started a transfer on "+uid);
@@ -151,7 +151,7 @@
continue; // should have started transfer
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0);
+ source.sendAsync(reject, null, 0, null);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -34,7 +34,7 @@
* transferring senders when starts transferring, and remove from it
* when finishes transferring.
*/
-public final class RequestSender implements Runnable {
+public final class RequestSender implements Runnable, ByteCounter {
// Constants
static final int ACCEPTED_TIMEOUT = 5000;
@@ -141,7 +141,7 @@
Message req = createDataRequest();
- next.send(req);
+ next.send(req, this);
sentRequest = true;
Message msg = null;
@@ -166,7 +166,7 @@
MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, this);
Logger.minor(this, "first part got "+msg);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from "+next+" while
waiting for Accepted on "+uid);
@@ -235,7 +235,7 @@
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from "+next+" while
waiting for data on "+uid);
break;
@@ -302,7 +302,7 @@
notifyAll();
}
- BlockReceiver br = new BlockReceiver(node.usm,
next, uid, prb);
+ BlockReceiver br = new BlockReceiver(node.usm,
next, uid, prb, this);
try {
Logger.minor(this, "Receiving data");
@@ -402,6 +402,7 @@
return;
} catch (KeyCollisionException e) {
Logger.normal(this, "Collision on "+this);
+ finish(SUCCESS, next);
}
}
@@ -502,6 +503,22 @@
if(status != NOT_FINISHED)
throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
status = code;
+
+ if(status != TIMED_OUT && status != GENERATED_REJECTED_OVERLOAD &&
status != INTERNAL_ERROR) {
+ if(key instanceof NodeSSK) {
+ Logger.minor(this, "SSK fetch cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
+ (source == null ? node.localSskFetchBytesSentAverage :
node.remoteSskFetchBytesSentAverage)
+ .report(getTotalSentBytes());
+ (source == null ? node.localSskFetchBytesReceivedAverage :
node.remoteSskFetchBytesReceivedAverage)
+ .report(getTotalReceivedBytes());
+ } else {
+ Logger.minor(this, "CHK fetch cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
+ (source == null ? node.localChkFetchBytesSentAverage :
node.remoteChkFetchBytesSentAverage)
+ .report(getTotalSentBytes());
+ (source == null ? node.localChkFetchBytesReceivedAverage :
node.remoteChkFetchBytesReceivedAverage)
+ .report(getTotalReceivedBytes());
+ }
+ }
synchronized(this) {
notifyAll();
@@ -527,4 +544,33 @@
public SSKBlock getSSKBlock() {
return block;
}
+
+ private final Object totalBytesSync = new Object();
+ private int totalBytesSent;
+
+ public void sentBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesSent += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesSent;
+ }
+ }
+
+ private int totalBytesReceived;
+
+ public void receivedBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesReceived += x;
+ }
+ }
+
+ public int getTotalReceivedBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesReceived;
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -3,6 +3,8 @@
import freenet.client.async.RequestScheduler;
import freenet.client.async.SendableRequest;
import freenet.support.Logger;
+import freenet.support.TokenBucket;
+import freenet.support.math.RunningAverage;
/**
* Starts requests.
@@ -37,14 +39,23 @@
}
final BaseRequestThrottle throttle;
+ final TokenBucket inputBucket;
+ final TokenBucket outputBucket;
+ final RunningAverage averageInputBytesPerRequest;
+ final RunningAverage averageOutputBytesPerRequest;
RequestScheduler sched;
final Node node;
private long sentRequestTime;
- public RequestStarter(Node node, BaseRequestThrottle throttle, String
name) {
+ public RequestStarter(Node node, BaseRequestThrottle throttle, String
name, TokenBucket outputBucket, TokenBucket inputBucket,
+ RunningAverage averageOutputBytesPerRequest,
RunningAverage averageInputBytesPerRequest) {
this.node = node;
this.throttle = throttle;
this.name = name;
+ this.outputBucket = outputBucket;
+ this.inputBucket = inputBucket;
+ this.averageOutputBytesPerRequest =
averageOutputBytesPerRequest;
+ this.averageInputBytesPerRequest = averageInputBytesPerRequest;
}
void setScheduler(RequestScheduler sched) {
@@ -92,6 +103,8 @@
long delay = throttle.getDelay();
Logger.minor(this, "Delay="+delay+" from
"+throttle);
long sleepUntil = sentRequestTime + delay;
+ inputBucket.blockingGrab((int)(Math.max(0,
averageInputBytesPerRequest.currentValue())));
+ outputBucket.blockingGrab((int)(Math.max(0,
averageOutputBytesPerRequest.currentValue())));
long now;
do {
now = System.currentTimeMillis();
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -85,7 +85,7 @@
Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);
try {
- source.send(accepted);
+ source.send(accepted, null);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection to source");
return;
@@ -98,7 +98,7 @@
MessageFilter mfPK =
MessageFilter.create().setType(DMT.FNPSSKPubKey).setField(DMT.UID,
uid).setSource(source).setTimeout(PUBKEY_TIMEOUT);
try {
- Message pk = node.usm.waitFor(mfPK);
+ Message pk = node.usm.waitFor(mfPK, null);
if(pk == null) {
Logger.normal(this, "Failed to receive
FNPSSKPubKey for "+uid);
return;
@@ -109,7 +109,7 @@
Logger.minor(this, "Got pubkey on
"+uid+" : "+pubKey);
Message confirm =
DMT.createFNPSSKPubKeyAccepted(uid);
try {
- source.sendAsync(confirm, null,
0);
+ source.sendAsync(confirm, null,
0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost
connection to source on "+uid);
return;
@@ -118,7 +118,7 @@
Logger.error(this, "Invalid pubkey from
"+source+" on "+uid);
Message msg =
DMT.createFNPDataInsertRejected(uid, DMT.DATA_INSERT_REJECTED_SSK_ERROR);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException ee) {
// Ignore
}
@@ -137,7 +137,7 @@
Logger.error(this, "Invalid SSK from "+source, e1);
Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_SSK_ERROR);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
// Ignore
}
@@ -149,7 +149,7 @@
if(storedBlock != null && !storedBlock.equals(block)) {
Message msg = DMT.createFNPSSKDataFound(uid,
storedBlock.getRawHeaders(), storedBlock.getRawData());
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to source
on "+uid);
}
@@ -162,7 +162,7 @@
Message msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
// Ignore
}
@@ -191,7 +191,7 @@
// Forward it
Message m = DMT.createFNPRejectedOverload(uid, false);
try {
- source.send(m);
+ source.send(m, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -210,7 +210,7 @@
}
Message msg = DMT.createFNPSSKDataFound(uid, headers, data);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to source");
return;
@@ -231,7 +231,7 @@
status == SSKInsertSender.INTERNAL_ERROR) {
Message msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -247,7 +247,7 @@
if(status == SSKInsertSender.ROUTE_NOT_FOUND || status ==
SSKInsertSender.ROUTE_REALLY_NOT_FOUND) {
Message msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -261,7 +261,7 @@
Message msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -275,7 +275,7 @@
Logger.error(this, "Unknown status code:
"+sender.getStatusString());
Message msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg);
+ source.send(msg, null);
} catch (NotConnectedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -24,7 +24,7 @@
* wait for a long data-transfer timeout.
* - SSKs have pubkeys, which don't always need to be sent.
*/
-public class SSKInsertSender implements Runnable, AnyInsertSender {
+public class SSKInsertSender implements Runnable, AnyInsertSender, ByteCounter
{
// Constants
static final int ACCEPTED_TIMEOUT = 10000;
@@ -172,7 +172,7 @@
// Send to next node
try {
- next.sendAsync(req, null, 0);
+ next.sendAsync(req, null, 0, this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected to "+next);
continue;
@@ -190,7 +190,7 @@
while (true) {
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, null);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from
" + next
+ " while waiting for
Accepted");
@@ -245,7 +245,7 @@
if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey.asBytes());
try {
- next.sendAsync(pkMsg, null, 0);
+ next.sendAsync(pkMsg, null, 0, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Node disconnected while sending
pubkey: "+next);
continue;
@@ -257,7 +257,7 @@
Message newAck;
try {
- newAck = node.usm.waitFor(mf1);
+ newAck = node.usm.waitFor(mf1, null);
} catch (DisconnectedException e) {
Logger.minor(this, "Disconnected from
"+next);
htl--;
@@ -299,7 +299,7 @@
while (true) {
try {
- msg = node.usm.waitFor(mf);
+ msg = node.usm.waitFor(mf, null);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from
" + next
+ " while waiting for
InsertReply on " + this);
@@ -456,6 +456,15 @@
notifyAll();
}
+ if(code != TIMED_OUT && code != GENERATED_REJECTED_OVERLOAD && code !=
INTERNAL_ERROR
+ && code != ROUTE_REALLY_NOT_FOUND) {
+ Logger.minor(this, "SSK insert cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
+ (source == null ? node.localChkInsertBytesSentAverage :
node.remoteChkInsertBytesSentAverage)
+ .report(getTotalSentBytes());
+ (source == null ? node.localChkInsertBytesReceivedAverage :
node.remoteChkInsertBytesReceivedAverage)
+ .report(getTotalReceivedBytes());
+ }
+
Logger.minor(this, "Set status code: "+getStatusString());
// Nothing to wait for, no downstream transfers, just exit.
}
@@ -523,4 +532,33 @@
return uid;
}
+ private final Object totalBytesSync = new Object();
+ private int totalBytesSent;
+
+ public void sentBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesSent += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesSent;
+ }
+ }
+
+ private int totalBytesReceived;
+
+ public void receivedBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesReceived += x;
+ }
+ }
+
+ public int getTotalReceivedBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesReceived;
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2006-07-08 01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2006-07-08 01:42:43 UTC (rev 9500)
@@ -34,7 +34,7 @@
public void disconnected() {
Logger.minor(this, "Disconnect trigger: "+this);
try {
- dest.sendAsync(msg, null, 0);
+ dest.sendAsync(msg, null, 0, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Both source and destination disconnected:
"+msg+" for "+this);
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-08 01:05:24 UTC (rev
9499)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-08 01:42:43 UTC (rev
9500)
@@ -18,7 +18,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 862;
+ private static final int buildNumber = 863;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 839;
Modified: trunk/freenet/src/freenet/support/DoubleTokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/DoubleTokenBucket.java 2006-07-08
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/support/DoubleTokenBucket.java 2006-07-08
01:42:43 UTC (rev 9500)
@@ -81,10 +81,6 @@
if(current > max) current = max;
}
- public synchronized long getNanosPerTick() {
- return nanosPerTick;
- }
-
public synchronized long getSize() {
return max;
}
Modified: trunk/freenet/src/freenet/support/TokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/TokenBucket.java 2006-07-08 01:05:24 UTC
(rev 9499)
+++ trunk/freenet/src/freenet/support/TokenBucket.java 2006-07-08 01:42:43 UTC
(rev 9500)
@@ -184,4 +184,8 @@
}
return (nowNS - nextTick) / nanosPerTick;
}
+
+ public synchronized long getNanosPerTick() {
+ return nanosPerTick;
+ }
}