Author: toad
Date: 2008-09-19 17:39:53 +0000 (Fri, 19 Sep 2008)
New Revision: 22708
Modified:
trunk/freenet/src/freenet/io/comm/MessageCore.java
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
trunk/freenet/src/freenet/node/AnnounceSender.java
trunk/freenet/src/freenet/node/CHKInsertHandler.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/DarknetPeerNode.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/LocationManager.java
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/NetworkIDManager.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/OpennetManager.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
Log:
Implement hard bandwidth limiting (of ALL packets, well except for
disconnection called from PacketSender), in PacketSender.
Get rid of alreadyReportedBytes.
Modified: trunk/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageCore.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/io/comm/MessageCore.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -502,7 +502,7 @@
Logger.error(this, "Trying to send internal-only message "+m+"
of spec "+m.getSpec(), new Exception("debug"));
return;
}
- destination.sendAsync(m, null, 0, ctr);
+ destination.sendAsync(m, null, ctr);
}
public void setDispatcher(Dispatcher d) {
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -34,7 +34,7 @@
int getVersionNumber();
/** Send a message to the node */
- public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
+ public void sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter
ctr) throws NotConnectedException;
/** Send a throttled message to the node (may block for a long time).
* @throws SyncSendWaitedTooLongException */
Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -48,7 +48,7 @@
sentCancel = true;
}
try {
- peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid),
null, 0, ctr);
+ peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid),
null, ctr);
} catch (NotConnectedException e) {
// Cool
}
@@ -64,7 +64,7 @@
MessageFilter mfPacket =
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend)
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
if(prb.hasWholeFile()) {
try {
-
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, ctr);
+
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, ctr);
} catch (NotConnectedException e) {
// Ignore, we have the data.
}
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -154,7 +154,7 @@
sentCancel = true;
}
try {
- peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null,
0, ctr);
+ peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null,
ctr);
} catch (NotConnectedException e) {
// Cool
}
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -148,7 +148,7 @@
return ((PACKET_SIZE * 1000.0 / getDelay()));
}
- public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long
deadline, boolean blockForSend) throws NotConnectedException,
ThrottleDeprecatedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
+ public void sendThrottledMessage(Message msg, PeerContext peer, int
packetSize, ByteCounter ctr, long deadline, boolean blockForSend) throws
NotConnectedException, ThrottleDeprecatedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
long start = System.currentTimeMillis();
long bootID = peer.getBootID();
synchronized(this) {
@@ -230,15 +230,7 @@
Logger.minor(this, "Congestion control wait time:
"+waitTime+" for "+this);
MyCallback callback = new MyCallback();
try {
- if(((PeerNode)peer).shouldThrottle()) {
- if(logMINOR) Logger.minor(this, "Throttling
"+peer.shortToString()+" : "+packetSize+" for "+this);
- long startTime = System.currentTimeMillis();
- overallThrottle.blockingGrab(packetSize);
- long delayTime = System.currentTimeMillis() -
startTime;
-
((PeerNode)peer).reportThrottledPacketSendTime(delayTime);
- } else if(logMINOR)
- Logger.minor(this, "Not throttling
"+peer.shortToString()+" for "+this);
- peer.sendAsync(msg, callback, packetSize, ctr);
+ peer.sendAsync(msg, callback, ctr);
ctr.sentPayload(packetSize);
if(blockForSend) {
synchronized(callback) {
Modified: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -83,7 +83,7 @@
boolean hasForwarded = false;
if(source != null) {
try {
- source.sendAsync(DMT.createFNPAccepted(uid),
null, 0, this);
+ source.sendAsync(DMT.createFNPAccepted(uid),
null, this);
} catch (NotConnectedException e) {
return;
}
@@ -414,7 +414,7 @@
Message msg = DMT.createFNPRejectedOverload(uid, true);
if(source != null) {
try {
- source.sendAsync(msg, null, 0, this);
+ source.sendAsync(msg, null, this);
} catch (NotConnectedException e) {
// Ok
}
@@ -426,7 +426,7 @@
Message msg = DMT.createFNPRouteNotFound(uid, htl);
if(source != null) {
try {
- source.sendAsync(msg, null, 0, this);
+ source.sendAsync(msg, null, this);
} catch (NotConnectedException e) {
// Ok
}
@@ -441,7 +441,7 @@
Message msg = DMT.createFNPOpennetAnnounceCompleted(uid);
if(source != null) {
try {
- source.sendAsync(msg, null, 0, this);
+ source.sendAsync(msg, null, this);
} catch (NotConnectedException e) {
// Oh well.
}
@@ -495,7 +495,7 @@
private void sendNotWanted() throws NotConnectedException {
Message msg = DMT.createFNPOpennetAnnounceNodeNotWanted(uid);
- source.sendAsync(msg, null, 0, this);
+ source.sendAsync(msg, null, this);
}
private void sendOurRef(PeerNode next, byte[] ref) throws
NotConnectedException {
Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -113,9 +113,9 @@
if(source.isConnected() && (startTime >
(source.timeLastConnectionCompleted()+Node.HANDSHAKE_TIMEOUT*4)))
Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null, 0, this);
+ source.sendAsync(tooSlow, null, this);
Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
- source.sendAsync(m, null, 0, this);
+ source.sendAsync(m, null, this);
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
br = new BlockReceiver(node.usm, source, uid, prb,
this);
prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
@@ -384,7 +384,7 @@
}
if(toSend != null) {
try {
- source.sendAsync(toSend, null, 0, this);
+ source.sendAsync(toSend, null, this);
} catch (NotConnectedException e) {
// :(
if(logMINOR) Logger.minor(this, "Lost connection in "+this+"
when sending FNPDataInsertRejected");
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -339,7 +339,7 @@
ACCEPTED_TIMEOUT, using sendAsync() will
skip them before they get the request. This would be a need for retuning
ACCEPTED_TIMEOUT.
*/
- next.sendAsync(req, null, 0, this);
+ next.sendAsync(req, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected
to "+next);
continue;
Modified: trunk/freenet/src/freenet/node/DarknetPeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -303,7 +303,7 @@
if(localRequest) {
Message msg = DMT.createRoutingStatus(shouldRoute);
try {
- sendAsync(msg, null, 0,
node.nodeStats.setRoutingStatusCtr);
+ sendAsync(msg, null,
node.nodeStats.setRoutingStatusCtr);
} catch(NotConnectedException e) {
// ok
}
@@ -1333,7 +1333,7 @@
Node.N2N_MESSAGE_TYPE_FPROXY, fs
.toString().getBytes("UTF-8"));
try {
- sendAsync(n2ntm, null, 0,
node.nodeStats.nodeToNodeCounter);
+ sendAsync(n2ntm, null,
node.nodeStats.nodeToNodeCounter);
} catch (NotConnectedException e) {
fs.removeValue("sentTime");
queueN2NM(fs);
@@ -1367,7 +1367,7 @@
Node.N2N_MESSAGE_TYPE_FPROXY, fs
.toString().getBytes("UTF-8"));
try {
- sendAsync(n2ntm, null, 0,
node.nodeStats.nodeToNodeCounter);
+ sendAsync(n2ntm, null,
node.nodeStats.nodeToNodeCounter);
} catch (NotConnectedException e) {
fs.removeValue("sentTime");
queueN2NM(fs);
@@ -1410,7 +1410,7 @@
Node.N2N_MESSAGE_TYPE_FPROXY, fs
.toString().getBytes("UTF-8"));
try {
- sendAsync(n2ntm, null, 0,
node.nodeStats.nodeToNodeCounter);
+ sendAsync(n2ntm, null,
node.nodeStats.nodeToNodeCounter);
} catch (NotConnectedException e) {
fs.removeValue("sentTime");
queueN2NM(fs);
@@ -1506,7 +1506,7 @@
if(fo == null) {
Logger.error(this, "No such offer: "+uid);
try {
- sendAsync(DMT.createFNPBulkSendAborted(uid),
null, 0, node.nodeStats.nodeToNodeCounter);
+ sendAsync(DMT.createFNPBulkSendAborted(uid),
null, node.nodeStats.nodeToNodeCounter);
} catch (NotConnectedException e) {
// Fine by me!
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -1667,13 +1667,13 @@
System.arraycopy(random, 0, data,
hash.length+iv.length+2+output.length, random.length);
node.nodeStats.reportAuthBytes(data.length +
sock.getHeadersLength());
try {
- sendPacket(data, replyTo, pn, 0);
+ sendPacket(data, replyTo, pn);
} catch (LocalAddressException e) {
Logger.error(this, "Tried to send auth packet to local
address: "+replyTo+" for "+pn+" - maybe you should set allowLocalAddresses for
this peer??");
}
}
- private void sendPacket(byte[] data, Peer replyTo, PeerNode pn, int
alreadyReportedBytes) throws LocalAddressException {
+ private void sendPacket(byte[] data, Peer replyTo, PeerNode pn) throws
LocalAddressException {
if(pn != null) {
if(pn.isIgnoreSource()) {
Peer p = pn.getPeer();
@@ -1684,13 +1684,7 @@
if(pn != null)
pn.reportOutgoingPacket(data, 0, data.length,
System.currentTimeMillis());
if(PeerNode.shouldThrottle(replyTo, node)) {
- int reportableBytes = data.length -
alreadyReportedBytes;
- if(reportableBytes <= 0) {
- Logger.error(this, "alreadyReportedBytes
("+alreadyReportedBytes+")> data.length ("+data.length+")");
- reportableBytes = 0;
- }
- if(reportableBytes > 0)
- node.outputThrottle.forceGrab(reportableBytes);
+ node.outputThrottle.forceGrab(data.length);
}
}
@@ -2007,7 +2001,6 @@
}
if(logMINOR) Logger.minor(this, "processOutgoingOrRequeue
"+messages.length+" messages for "+pn+" ("+neverWaitForPacketNumber+ ')');
byte[][] messageData = new byte[messages.length][];
- int[] alreadyReported = new int[messages.length];
MessageItem[] newMsgs = new MessageItem[messages.length];
KeyTracker kt = pn.getCurrentKeyTracker();
if(kt == null) {
@@ -2028,7 +2021,7 @@
try {
byte[] buf = mi.getData();
int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
- int size =
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb,
mi.alreadyReportedBytes, mi.getPriority());
+ int size =
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb,
mi.getPriority());
//MARK: onSent()
mi.onSent(size);
} catch (NotConnectedException e) {
@@ -2072,10 +2065,9 @@
// Will be handled later
}
newMsgs[x] = mi;
- alreadyReported[x] = mi.alreadyReportedBytes;
x++;
if(mi.cb != null) callbacksCount +=
mi.cb.length;
- if(logMINOR) Logger.minor(this, "Sending:
"+mi+" length "+data.length+" cb "+ Arrays.toString(mi.cb)+" reported
"+mi.alreadyReportedBytes);
+ if(logMINOR) Logger.minor(this, "Sending:
"+mi+" length "+data.length+" cb "+ Arrays.toString(mi.cb));
length += (data.length + 2);
}
}
@@ -2090,12 +2082,10 @@
}
AsyncMessageCallback callbacks[] = new
AsyncMessageCallback[callbacksCount];
x=0;
- int alreadyReportedBytes = 0;
short priority = DMT.PRIORITY_BULK_DATA;
for(int i=0;i<messages.length;i++) {
if(messages[i].formatted) continue;
if(messages[i].cb != null) {
- alreadyReportedBytes +=
messages[i].alreadyReportedBytes;
System.arraycopy(messages[i].cb, 0, callbacks,
x, messages[i].cb.length);
x += messages[i].cb.length;
}
@@ -2108,7 +2098,7 @@
(messageData.length < 256)) {
mi_name = null;
try {
- int size = innerProcessOutgoing(messageData, 0,
messageData.length, length, pn, neverWaitForPacketNumber, callbacks,
alreadyReportedBytes, priority);
+ int size = innerProcessOutgoing(messageData, 0,
messageData.length, length, pn, neverWaitForPacketNumber, callbacks, priority);
int totalMessageSize = 0;
for(int i=0;i<messageData.length;i++)
totalMessageSize += messageData[i].length;
int overhead = size - totalMessageSize;
@@ -2146,7 +2136,6 @@
length += kt.countAcks() + kt.countAckRequests() +
kt.countResendRequests();
int count = 0;
int lastIndex = 0;
- alreadyReportedBytes = 0;
if(logMINOR)
Logger.minor(this, "Sending
"+messageData.length+" messages");
for(int i=0;i<=messageData.length;i++) {
@@ -2164,7 +2153,7 @@
mi_name = null;
try {
// FIXME regenerate
callbacks and priority!
- int size =
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
+ int size =
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn,
neverWaitForPacketNumber, callbacks, priority);
int totalMessageSize =
0;
for(int
j=lastIndex;j<i;j++) totalMessageSize += messageData[j].length;
int overhead = size -
totalMessageSize;
@@ -2200,12 +2189,10 @@
lastIndex = i;
if(i != messageData.length) {
length = 1 +
(messageData[i].length + 2);
- alreadyReportedBytes =
alreadyReported[i];
}
count = 0;
} else {
length = newLength;
- alreadyReportedBytes +=
alreadyReported[i];
}
}
}
@@ -2221,8 +2208,8 @@
* @throws PacketSequenceException
*/
private int innerProcessOutgoing(byte[][] messageData, int start, int
length, int bufferLength,
- PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks, int alreadyReportedBytes, short priority)
throws NotConnectedException, WouldBlockException, PacketSequenceException {
- if(logMINOR) Logger.minor(this,
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+
','+callbacks.length+','+alreadyReportedBytes+')');
+ PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks, short priority) throws NotConnectedException,
WouldBlockException, PacketSequenceException {
+ if(logMINOR) Logger.minor(this,
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+
','+callbacks.length+')');
byte[] buf = new byte[bufferLength];
buf[0] = (byte)length;
int loc = 1;
@@ -2234,15 +2221,15 @@
System.arraycopy(data, 0, buf, loc, len);
loc += len;
}
- return processOutgoingPreformatted(buf, 0, loc, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
+ return processOutgoingPreformatted(buf, 0, loc, pn,
neverWaitForPacketNumber, callbacks, priority);
}
/* (non-Javadoc)
* @see freenet.node.OutgoingPacketMangler#processOutgoing(byte[], int,
int, freenet.node.KeyTracker, int)
*/
- public int processOutgoing(byte[] buf, int offset, int length,
KeyTracker tracker, int alreadyReportedBytes, short priority) throws
KeyChangedException, NotConnectedException, PacketSequenceException,
WouldBlockException {
+ public int processOutgoing(byte[] buf, int offset, int length,
KeyTracker tracker, short priority) throws KeyChangedException,
NotConnectedException, PacketSequenceException, WouldBlockException {
byte[] newBuf = preformat(buf, offset, length);
- return processOutgoingPreformatted(newBuf, 0, newBuf.length,
tracker, -1, null, alreadyReportedBytes, priority);
+ return processOutgoingPreformatted(newBuf, 0, newBuf.length,
tracker, -1, null, priority);
}
/**
@@ -2250,7 +2237,7 @@
* the key changes.
* @throws PacketSequenceException
*/
- int processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks, int alreadyReportedBytes, short priority) throws
NotConnectedException, WouldBlockException, PacketSequenceException {
+ int processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks, short priority) throws NotConnectedException, WouldBlockException,
PacketSequenceException {
KeyTracker last = null;
while(true) {
try {
@@ -2264,7 +2251,7 @@
}
int seqNo = neverWaitForPacketNumber ?
tracker.allocateOutgoingPacketNumberNeverBlock() :
tracker.allocateOutgoingPacketNumber();
- return processOutgoingPreformatted(buf, offset,
length, tracker, seqNo, callbacks, alreadyReportedBytes, priority);
+ return processOutgoingPreformatted(buf, offset,
length, tracker, seqNo, callbacks, priority);
} catch (KeyChangedException e) {
Logger.normal(this, "Key changed(2) for
"+peer.getPeer());
if(last == peer.getCurrentKeyTracker()) {
@@ -2296,7 +2283,7 @@
/* (non-Javadoc)
* @see
freenet.node.OutgoingPacketMangler#processOutgoingPreformatted(byte[], int,
int, freenet.node.KeyTracker, int, freenet.node.AsyncMessageCallback[], int)
*/
- public int processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks,
int alreadyReportedBytes, short priority) throws KeyChangedException,
NotConnectedException, PacketSequenceException, WouldBlockException {
+ public int processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks,
short priority) throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException {
if(logMINOR) {
String log =
"processOutgoingPreformatted("+Fields.hashCode(buf)+", "+offset+ ',' +length+
',' +tracker+ ',' +packetNumber+ ',';
if(callbacks == null) log += "null";
@@ -2527,7 +2514,7 @@
if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);
- int ret = processOutgoingFullyFormatted(plaintext, tracker,
alreadyReportedBytes);
+ int ret = processOutgoingFullyFormatted(plaintext, tracker);
if(logMINOR) Logger.minor(this, "Sent packet "+seqNumber);
return ret;
}
@@ -2537,7 +2524,7 @@
* @param plaintext The packet's plaintext, including all formatting,
* including acks and resend requests. Is clobbered.
*/
- private int processOutgoingFullyFormatted(byte[] plaintext, KeyTracker
kt, int alreadyReportedBytes) {
+ private int processOutgoingFullyFormatted(byte[] plaintext, KeyTracker
kt) {
BlockCipher sessionCipher = kt.sessionCipher;
if(logMINOR) Logger.minor(this, "Encrypting with
"+HexUtil.bytesToHex(kt.sessionKey));
if(sessionCipher == null) {
@@ -2592,7 +2579,7 @@
// pn.getPeer() cannot be null
try {
- sendPacket(output, kt.pn.getPeer(), kt.pn,
alreadyReportedBytes);
+ sendPacket(output, kt.pn.getPeer(), kt.pn);
// System.err.println(kt.pn.getIdentityString()+" : sent
packet length "+output.length);
} catch (LocalAddressException e) {
Logger.error(this, "Tried to send data packet to local
address: "+kt.pn.getPeer()+" for "+kt.pn.allowLocalAddresses());
@@ -2634,7 +2621,7 @@
}
public void resend(ResendPacketItem item) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException {
- int size = processOutgoingPreformatted(item.buf, 0,
item.buf.length, item.kt, item.packetNumber, item.callbacks, 0, item.priority);
+ int size = processOutgoingPreformatted(item.buf, 0,
item.buf.length, item.kt, item.packetNumber, item.callbacks, item.priority);
item.pn.resendByteCounter.sentBytes(size);
}
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -402,7 +402,7 @@
SSKBlock block = node.fetch((NodeSSK)key, false);
if(block == null) {
// Don't have the key
-
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, senderCounter);
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
return;
}
@@ -410,7 +410,7 @@
Message headers = DMT.createFNPSSKDataFoundHeaders(uid,
block.getRawHeaders());
final int dataLength = block.getRawData().length;
- source.sendAsync(headers, null, 0, senderCounter);
+ source.sendAsync(headers, null, senderCounter);
node.executor.execute(new PrioRunnable() {
@@ -437,21 +437,21 @@
if(RequestHandler.SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid,
block.getRawHeaders(), block.getRawData());
- source.sendAsync(df, null, 0, senderCounter);
+ source.sendAsync(df, null, senderCounter);
}
if(needPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid,
block.getPubKey());
- source.sendAsync(pk, null, 0, senderCounter);
+ source.sendAsync(pk, null, senderCounter);
}
} else {
CHKBlock block = node.fetch((NodeCHK)key, false);
if(block == null) {
// Don't have the key
-
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, senderCounter);
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
return;
}
Message df = DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
- source.sendAsync(df, null, 0, senderCounter);
+ source.sendAsync(df, null, senderCounter);
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
final BlockTransmitter bt =
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -985,7 +985,7 @@
// Ignore packet#
if(logMINOR)
Logger.minor(this, "Queueing resend of what was
once " + element.packetNumber);
- messages[i] = new MessageItem(buf, callbacks, true, 0,
pn.resendByteCounter, element.priority);
+ messages[i] = new MessageItem(buf, callbacks, true,
pn.resendByteCounter, element.priority);
}
pn.requeueMessageItems(messages, 0, messages.length, true);
Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -861,7 +861,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(reject, null, 0, this);
+ pn.sendAsync(reject, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Lost connection rejecting
SwapRequest (locked) from "+pn);
}
@@ -889,7 +889,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(reject, null, 0, this);
+ pn.sendAsync(reject, null, this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection to "+pn+"
rejecting SwapRequest");
}
@@ -901,7 +901,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(reject, null, 0, this);
+ pn.sendAsync(reject, null, this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection rejecting
SwapRequest from "+pn);
}
@@ -919,7 +919,7 @@
// Reject
Message reject = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(reject, null, 0, this);
+ pn.sendAsync(reject, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Lost connection rejecting
SwapRequest (locked) from "+pn);
}
@@ -942,7 +942,7 @@
if(logMINOR) Logger.minor(this, "Late reject "+oldID);
Message reject = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(reject, null, 0, this);
+ pn.sendAsync(reject, null, this);
} catch (NotConnectedException e1) {
Logger.normal(this, "Late reject but disconnected from
sender: "+pn);
}
@@ -956,7 +956,7 @@
// Forward the request.
// Note that we MUST NOT send this blocking as we are on
the
// receiver thread.
- randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(oldID), pn, item), 0,
LocationManager.this);
+ randomPeer.sendAsync(m, new
MyCallback(DMT.createFNPSwapRejected(oldID), pn, item), LocationManager.this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Not connected");
// Try a different node
@@ -1001,7 +1001,7 @@
if(logMINOR) Logger.minor(this, "Rejecting "+msg);
Message rejected = DMT.createFNPSwapRejected(oldID);
try {
- pn.sendAsync(rejected, null, 0, this);
+ pn.sendAsync(rejected, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Lost connection rejecting
SwapRequest (locked) from "+pn);
}
@@ -1065,7 +1065,7 @@
m.set(DMT.UID, item.incomingID);
if(logMINOR) Logger.minor(this, "Forwarding SwapReply "+uid+" from
"+source+" to "+item.requestSender);
try {
- item.requestSender.sendAsync(m, null, 0, this);
+ item.requestSender.sendAsync(m, null, this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection forwarding
SwapReply "+uid+" to "+item.requestSender);
}
@@ -1099,7 +1099,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null, 0, this);
+ item.requestSender.sendAsync(m, null, this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection forwarding
SwapRejected "+uid+" to "+item.requestSender);
}
@@ -1125,7 +1125,7 @@
// Sending onwards - use outgoing ID
m.set(DMT.UID, item.outgoingID);
try {
- item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender, this), 0, this);
+ item.routedTo.sendAsync(m, new
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID),
item.requestSender, this), this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection forwarding
SwapCommit "+uid+" to "+item.routedTo);
}
@@ -1162,7 +1162,7 @@
// Returning to source - use incomingID
m.set(DMT.UID, item.incomingID);
try {
- item.requestSender.sendAsync(m, null, 0, this);
+ item.requestSender.sendAsync(m, null, this);
} catch (NotConnectedException e) {
Logger.normal(this, "Lost connection forwarding SwapComplete
"+uid+" to "+item.requestSender);
}
@@ -1278,7 +1278,7 @@
Message msg = DMT.createFNPSwapRejected(item.incomingID);
if(logMINOR) Logger.minor(this, "Rejecting in lostOrRestartedNode:
"+item.incomingID+ " from "+item.requestSender);
try {
- item.requestSender.sendAsync(msg, null, 0, this);
+ item.requestSender.sendAsync(msg, null, this);
} catch (NotConnectedException e1) {
Logger.normal(this, "Both sender and receiver disconnected for
"+item);
}
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -17,7 +17,6 @@
final byte[] buf;
final AsyncMessageCallback[] cb;
final long submitted;
- final int alreadyReportedBytes;
/** If true, the buffer may contain several messages, and is formatted
* for sending as a single packet.
*/
@@ -25,8 +24,7 @@
final ByteCounter ctrCallback;
private final short priority;
- public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int
alreadyReportedBytes, ByteCounter ctr, PeerNode pn) {
- this.alreadyReportedBytes = alreadyReportedBytes;
+ public MessageItem(Message msg2, AsyncMessageCallback[] cb2, ByteCounter
ctr, PeerNode pn) {
this.msg = msg2;
this.cb = cb2;
formatted = false;
@@ -34,13 +32,9 @@
this.submitted = System.currentTimeMillis();
priority = msg2.getSpec().getPriority();
buf = msg.encodeToPacket(pn);
- if(buf.length <= alreadyReportedBytes) {
- Logger.error(this, "buf.length = "+buf.length+" but
alreadyReportedBytes = "+alreadyReportedBytes+" on "+this);
- }
}
- public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes, ByteCounter ctr, short priority) {
- this.alreadyReportedBytes = alreadyReportedBytes;
+ public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, ByteCounter ctr, short priority) {
this.cb = cb2;
this.msg = null;
this.buf = data;
@@ -93,7 +87,7 @@
@Override
public String toString() {
- return
super.toString()+":formatted="+formatted+",msg="+msg+",alreadyReported="+alreadyReportedBytes;
+ return super.toString()+":formatted="+formatted+",msg="+msg;
}
public void onDisconnect() {
Modified: trunk/freenet/src/freenet/node/NetworkIDManager.java
===================================================================
--- trunk/freenet/src/freenet/node/NetworkIDManager.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/NetworkIDManager.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -101,7 +101,7 @@
if (logMINOR) Logger.minor(this, "Storing secret: "+s);
addOrReplaceSecret(s); // FIXME - what if the message contain a
bogus UID?
try {
- pn.sendAsync(DMT.createFNPAccepted(uid), null, 0, ctr);
+ pn.sendAsync(DMT.createFNPAccepted(uid), null, ctr);
} catch (NotConnectedException e) {
Logger.error(this, "peer disconnected before
storeSecret ack?", e);
}
@@ -133,7 +133,7 @@
if (disableSecretPings || node.recentlyCompleted(uid)) {
if (logMINOR) Logger.minor(this, "recently
complete/loop: "+uid);
- source.sendAsync(DMT.createFNPRejectedLoop(uid), null,
0, ctr);
+ source.sendAsync(DMT.createFNPRejectedLoop(uid), null,
ctr);
} else {
byte[] nodeIdentity = ((ShortBuffer)
m.getObject(DMT.NODE_IDENTITY)).getData();
StoredSecret match;
@@ -145,10 +145,10 @@
//This is the node that the ping intends to
reach, we will *not* forward it; but we might not respond positively either.
//don't set the completed flag, we might reject
it from one peer (too short a path) and accept it from another.
if (htl > dawnHtl) {
-
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
} else {
if (logMINOR) Logger.minor(this,
"Responding to "+source+" with "+match+" from "+match.peer);
-
source.sendAsync(match.getSecretPong(counter+1), null, 0, ctr);
+
source.sendAsync(match.getSecretPong(counter+1), null, ctr);
}
} else {
//Set the completed flag immediately for
determining reject loops rather than locking the uid.
@@ -169,7 +169,7 @@
if (next==null) {
//would be rnf... but this is a
more exhaustive and lightweight search I suppose.
-
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
break;
}
@@ -177,7 +177,7 @@
if (htl<=0) {
//would be dnf if we were
looking for data.
-
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
break;
}
@@ -188,7 +188,7 @@
counter++;
routedTo.add(next);
try {
-
next.sendAsync(DMT.createFNPSecretPing(uid, target, htl, dawnHtl, counter,
nodeIdentity), null, 0, ctr);
+
next.sendAsync(DMT.createFNPSecretPing(uid, target, htl, dawnHtl, counter,
nodeIdentity), null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, next+"
disconnected before secret-ping-forward");
continue;
@@ -218,7 +218,7 @@
counter=suppliedCounter;
long
secret=msg.getLong(DMT.SECRET);
if (logMINOR)
Logger.minor(this, node+" forwarding apparently-successful secretpong response:
"+counter+"/"+secret+" from "+next+" to "+source);
-
source.sendAsync(DMT.createFNPSecretPong(uid, counter, secret), null, 0, ctr);
+
source.sendAsync(DMT.createFNPSecretPong(uid, counter, secret), null, ctr);
break;
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2008-09-19 16:53:54 UTC (rev
22707)
+++ trunk/freenet/src/freenet/node/Node.java 2008-09-19 17:39:53 UTC (rev
22708)
@@ -99,7 +99,6 @@
import freenet.store.SSKStore;
import freenet.store.saltedhash.SaltedHashFreenetStore;
import freenet.support.ByteArrayWrapper;
-import freenet.support.DoubleTokenBucket;
import freenet.support.Executor;
import freenet.support.Fields;
import freenet.support.FileLoggerHook;
@@ -113,6 +112,7 @@
import freenet.support.PooledExecutor;
import freenet.support.ShortBuffer;
import freenet.support.SimpleFieldSet;
+import freenet.support.TokenBucket;
import freenet.support.api.BooleanCallback;
import freenet.support.api.IntCallback;
import freenet.support.api.LongCallback;
@@ -421,7 +421,7 @@
final LRUHashtable<ByteArrayWrapper, DSAPublicKey> cachedPubKeys;
final boolean testnetEnabled;
final TestnetHandler testnetHandler;
- public final DoubleTokenBucket outputThrottle;
+ public final TokenBucket outputThrottle;
public boolean throttleLocalData;
private int outputBandwidthLimit;
private int inputBandwidthLimit;
@@ -1113,7 +1113,7 @@
// Add them at a rate determined by the obwLimit.
// Maximum forced bytes 80%, in other words, 20% of the
bandwidth is reserved for
// block transfers, so we will use that 20% for block transfers
even if more than 80% of the limit is used for non-limited data (resends etc).
- outputThrottle = new DoubleTokenBucket(obwLimit/2,
(1000L*1000L*1000L) / obwLimit, obwLimit/2, 0.8);
+ outputThrottle = new TokenBucket(obwLimit/2,
(1000L*1000L*1000L) / obwLimit, obwLimit/2);
nodeConfig.register("inputBandwidthLimit", "-1", sortOrder++,
false, true, "Node.inBWLimit", "Node.inBWLimitLong", new IntCallback() {
@Override
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -93,7 +93,7 @@
// Send an FNPPong
Message reply =
DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
try {
- source.sendAsync(reply, null, 0, pingCounter);
// nothing we can do if can't contact source
+ source.sendAsync(reply, null, pingCounter); //
nothing we can do if can't contact source
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost
connection replying to "+m);
}
@@ -247,7 +247,7 @@
if(!HMAC.verifyWithSHA256(node.failureTable.offerAuthenticatorKey,
key.getFullKey(), authenticator)) {
Logger.error(this, "Invalid offer request from
"+source+" : authenticator did not verify");
try {
-
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 0,
node.failureTable.senderCounter);
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null,
node.failureTable.senderCounter);
} catch (NotConnectedException e) {
// Too bad.
}
@@ -267,7 +267,7 @@
Logger.normal(this, "Rejecting FNPGetOfferedKey from
"+source+" for "+key+" : "+reject);
Message rejected = DMT.createFNPRejectedOverload(uid,
true);
try {
- source.sendAsync(rejected, null, 0,
node.failureTable.senderCounter);
+ source.sendAsync(rejected, null,
node.failureTable.senderCounter);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
}
@@ -346,7 +346,7 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting data request
(loop, finished): "+e);
}
@@ -358,7 +358,7 @@
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting request from
"+source.getPeer()+": "+e);
}
@@ -373,7 +373,7 @@
Logger.normal(this, "Rejecting "+(isSSK ? "SSK" :
"CHK")+" request from "+source.getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id,
true);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
}
@@ -396,7 +396,7 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request
from "+source.getPeer()+": "+e);
}
@@ -406,7 +406,7 @@
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting insert request
from "+source.getPeer()+": "+e);
}
@@ -418,7 +418,7 @@
Logger.normal(this, "Rejecting insert from
"+source.getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id,
true);
try {
- source.sendAsync(rejected, null, 0, ctr);
+ source.sendAsync(rejected, null, ctr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload)
insert request from "+source.getPeer()+": "+e);
}
@@ -453,7 +453,7 @@
if(node.recentlyCompleted(id)) {
Message rejected = DMT.createFNPRejectedLoop(id);
try {
- source.sendAsync(rejected, null, 0,
node.nodeStats.probeRequestCtr);
+ source.sendAsync(rejected, null,
node.nodeStats.probeRequestCtr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting probe request
from "+source.getPeer()+": "+e);
}
@@ -466,7 +466,7 @@
Logger.normal(this, "Rejecting probe request from
"+source.getPeer());
Message rejected = DMT.createFNPRejectedOverload(id,
true);
try {
- source.sendAsync(rejected, null, 0,
node.nodeStats.probeRequestCtr);
+ source.sendAsync(rejected, null,
node.nodeStats.probeRequestCtr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload)
insert request from "+source.getPeer()+": "+e);
}
@@ -477,7 +477,7 @@
Logger.normal(this, "Rejecting invalid
(target="+target+") probe request from "+source.getPeer());
Message rejected = DMT.createFNPRejectedOverload(id,
true);
try {
- source.sendAsync(rejected, null, 0,
node.nodeStats.probeRequestCtr);
+ source.sendAsync(rejected, null,
node.nodeStats.probeRequestCtr);
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (invalid) insert
request from "+source.getPeer()+": "+e);
}
@@ -493,7 +493,7 @@
if(om == null || !source.canAcceptAnnouncements()) {
Message msg = DMT.createFNPOpennetDisabled(uid);
try {
- source.sendAsync(msg, null, 0,
node.nodeStats.announceByteCounter);
+ source.sendAsync(msg, null,
node.nodeStats.announceByteCounter);
} catch (NotConnectedException e) {
// Ok
}
@@ -502,7 +502,7 @@
if(node.recentlyCompleted(uid)) {
Message msg = DMT.createFNPRejectedLoop(uid);
try {
- source.sendAsync(msg, null, 0,
node.nodeStats.announceByteCounter);
+ source.sendAsync(msg, null,
node.nodeStats.announceByteCounter);
} catch (NotConnectedException e) {
// Ok
}
@@ -516,7 +516,7 @@
if(!source.shouldAcceptAnnounce(uid)) {
Message msg =
DMT.createFNPRejectedOverload(uid, true);
try {
- source.sendAsync(msg, null, 0,
node.nodeStats.announceByteCounter);
+ source.sendAsync(msg, null,
node.nodeStats.announceByteCounter);
} catch (NotConnectedException e) {
// Ok
}
@@ -596,7 +596,7 @@
// Relay.
if(rc.source != null) {
try {
-
rc.source.sendAsync(DMT.createFNPRoutedRejected(id, (short)0), null, 0,
nodeStats.routedMessageCtr);
+
rc.source.sendAsync(DMT.createFNPRoutedRejected(id, (short)0), null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
// Ouch.
Logger.error(this, "Unable to relay
probe DNF: peer disconnected: "+rc.source);
@@ -626,7 +626,7 @@
ctx = routedContexts.get(lid);
if(ctx != null) {
try {
-
source.sendAsync(DMT.createFNPRoutedRejected(id, htl), null, 0,
nodeStats.routedMessageCtr);
+
source.sendAsync(DMT.createFNPRoutedRejected(id, htl), null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost
connection rejecting "+m);
}
@@ -648,7 +648,7 @@
} else if(htl == 0) {
Message reject = DMT.createFNPRoutedRejected(id,
(short)0);
if(source != null) try {
- source.sendAsync(reject, null, 0,
nodeStats.routedMessageCtr);
+ source.sendAsync(reject, null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost
connection rejecting "+m);
}
@@ -670,7 +670,7 @@
PeerNode pn = ctx.source;
if(pn == null) return false;
try {
- pn.sendAsync(m, null, 0, nodeStats.routedMessageCtr);
+ pn.sendAsync(m, null, nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost connection
forwarding "+m+" to "+pn);
}
@@ -696,7 +696,7 @@
if(logMINOR) Logger.minor(this, "Forwarding
"+m.getSpec()+" to "+next.getPeer().getPort());
ctx.addSent(next);
try {
- next.sendAsync(m, null, 0,
nodeStats.routedMessageCtr);
+ next.sendAsync(m, null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
continue;
}
@@ -705,7 +705,7 @@
// Reached a dead end...
Message reject =
DMT.createFNPRoutedRejected(id, htl);
if(pn != null) try {
- pn.sendAsync(reject, null, 0,
nodeStats.routedMessageCtr);
+ pn.sendAsync(reject, null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
Logger.error(this, "Cannot send reject
message back to source "+pn);
return true;
@@ -741,7 +741,7 @@
Message reply = DMT.createFNPRoutedPong(id, x);
if(logMINOR) Logger.minor(this, "Replying - counter =
"+x+" for "+id);
try {
- src.sendAsync(reply, null, 0,
nodeStats.routedMessageCtr);
+ src.sendAsync(reply, null,
nodeStats.routedMessageCtr);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Lost
connection replying to "+m+" in dispatchRoutedMessage");
}
Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/OpennetManager.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -520,7 +520,7 @@
long xferUID = node.random.nextLong();
Message msg2 = isReply ?
DMT.createFNPOpennetConnectReplyNew(uid, xferUID, noderef.length,
padded.length) :
DMT.createFNPOpennetConnectDestinationNew(uid, xferUID,
noderef.length, padded.length);
- peer.sendAsync(msg2, null, 0, ctr);
+ peer.sendAsync(msg2, null, ctr);
innerSendOpennetRef(xferUID, padded, peer, ctr);
}
@@ -551,7 +551,7 @@
long xferUID = node.random.nextLong();
Message msg = DMT.createFNPOpennetAnnounceRequest(uid, xferUID,
noderef.length,
paddedSize(noderef.length), target, htl);
- peer.sendAsync(msg, null, 0, ctr);
+ peer.sendAsync(msg, null, ctr);
return xferUID;
}
@@ -582,7 +582,7 @@
long xferUID = node.random.nextLong();
Message msg = DMT.createFNPOpennetAnnounceReply(uid, xferUID,
noderef.length,
padded.length);
- peer.sendAsync(msg, null, 0, ctr);
+ peer.sendAsync(msg, null, ctr);
innerSendOpennetRef(xferUID, padded, peer, ctr);
}
@@ -664,7 +664,7 @@
public void rejectRef(long uid, PeerNode source, int reason,
ByteCounter ctr) {
Message msg = DMT.createFNPOpennetNoderefRejected(uid, reason);
try {
- source.sendAsync(msg, null, 0, ctr);
+ source.sendAsync(msg, null, ctr);
} catch (NotConnectedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -43,7 +43,7 @@
* @return Total size including UDP headers of the sent packet.
*/
public int processOutgoing(byte[] buf, int offset, int length,
- KeyTracker tracker, int alreadyReportedBytes, short
priority)
+ KeyTracker tracker, short priority)
throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException;
@@ -66,7 +66,7 @@
*/
public int processOutgoingPreformatted(byte[] buf, int offset, int
length,
KeyTracker tracker, int packetNumber,
- AsyncMessageCallback[] callbacks, int
alreadyReportedBytes, short priority)
+ AsyncMessageCallback[] callbacks, short priority)
throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException;
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -12,6 +12,7 @@
import freenet.io.comm.Message;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PacketSocketHandler;
+import freenet.io.comm.UdpSocketHandler;
import freenet.support.FileLoggerHook;
import freenet.support.Logger;
import freenet.support.OOMHandler;
@@ -162,11 +163,16 @@
public void run() {
if(logMINOR) Logger.minor(this, "In PacketSender.run()");
freenet.support.Logger.OSThread.logPID(this);
+ /*
+ * Index of the point in the nodes list at which we sent a
packet and then
+ * ran out of bandwidth. We start the loop from here next time.
+ */
+ int brokeAt = 0;
while(true) {
lastReceivedPacketFromAnyNode = lastReportedNoPackets;
try {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
- realRun();
+ brokeAt = realRun(brokeAt);
} catch(OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Will retry above failed
operation...");
@@ -178,7 +184,7 @@
}
}
- private void realRun() {
+ private int realRun(int brokeAt) {
long now = System.currentTimeMillis();
lastTimeInSeconds = (int) (now / 1000);
PeerManager pm = node.peers;
@@ -200,9 +206,27 @@
long oldTempNow = now;
// Needs to be run very frequently. Maybe change to a regular
once per second schedule job?
// Maybe not worth it as it is fairly lightweight.
+ // FIXME given the lock contention, maybe it's worth it? What
about
+ // running it on the UdpSocketHandler thread? That would surely
be better...?
node.lm.removeTooOldQueuedItems();
+
+ boolean canSendThrottled = false;
+
+ int MAX_PACKET_SIZE =
node.darknetCrypto.socket.getMaxPacketSize();
+ long count = node.outputThrottle.count();
+ if(count > MAX_PACKET_SIZE)
+ canSendThrottled = true;
+ else {
+ long canSendAt = node.outputThrottle.getNanosPerTick()
* (MAX_PACKET_SIZE - count);
+ canSendAt = (canSendAt / (1000*1000)) + (canSendAt %
(1000*1000) == 0 ? 0 : 1);
+ if(logMINOR)
+ Logger.minor(this, "Can send throttled packets
in "+canSendAt+"ms");
+ nextActionTime = Math.min(nextActionTime, now +
canSendAt);
+ }
+
+ int newBrokeAt = 0;
for(int i = 0; i < nodes.length; i++) {
- PeerNode pn = nodes[i];
+ PeerNode pn = nodes[i + brokeAt % nodes.length];
lastReceivedPacketFromAnyNode =
Math.max(pn.lastReceivedPacketTime(),
lastReceivedPacketFromAnyNode);
pn.maybeOnConnect();
@@ -210,6 +234,8 @@
// Might as well do it properly.
node.peers.disconnect(pn, true, true);
}
+ if(pn.shouldThrottle() && !canSendThrottled)
+ continue;
if(pn.isConnected()) {
// Is the node dead?
@@ -230,7 +256,20 @@
continue;
}
- pn.maybeSendPacket(now, rpiTemp, rpiIntTemp);
+ if(pn.maybeSendPacket(now, rpiTemp, rpiIntTemp)
&& canSendThrottled) {
+ canSendThrottled = false;
+ count = node.outputThrottle.count();
+ if(count > MAX_PACKET_SIZE)
+ canSendThrottled = true;
+ else {
+ long canSendAt =
node.outputThrottle.getNanosPerTick() * (MAX_PACKET_SIZE - count);
+ canSendAt = (canSendAt /
(1000*1000)) + (canSendAt % (1000*1000) == 0 ? 0 : 1);
+ if(logMINOR)
+ Logger.minor(this, "Can
send throttled packets in "+canSendAt+"ms");
+ nextActionTime =
Math.min(nextActionTime, now + canSendAt);
+ newBrokeAt = i;
+ }
+ }
long urgentTime = pn.getNextUrgentTime(now);
// Should spam the logs, unless there is a
deadlock
@@ -256,6 +295,7 @@
Logger.error(this, "tempNow is more than 5
seconds past oldTempNow (" + (tempNow - oldTempNow) + ") in PacketSender
working with " + pn.userToString());
oldTempNow = tempNow;
}
+ brokeAt = newBrokeAt;
// Consider sending connect requests to our opennet old-peers.
// No point if they are NATed, of course... but we don't know
whether they are.
@@ -372,6 +412,7 @@
// because a new packet came in.
}
}
+ return brokeAt;
}
/** Wake up, and send any queued packets. */
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -517,7 +517,7 @@
if(removePeer(pn))
writePeers();
}
- }, 0, ctrDisconn);
+ }, ctrDisconn);
} catch(NotConnectedException e) {
if(pn.isDisconnecting() && removePeer(pn))
writePeers();
@@ -723,7 +723,7 @@
if(onlyRealConnections && !peers[i].isRealConnection())
continue;
try {
- peers[i].sendAsync(msg, null, 0, ctr);
+ peers[i].sendAsync(msg, null, ctr);
} catch(NotConnectedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -1024,7 +1024,7 @@
* relating to this packet (normally set when we have delayed a packet
in order to
* throttle it).
*/
- public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException {
+ public void sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter
ctr) throws NotConnectedException {
if(ctr == null)
Logger.error(this, "Bytes not logged", new
Exception("debug"));
if(logMINOR)
@@ -1032,7 +1032,7 @@
if(!isConnected())
throw new NotConnectedException();
addToLocalNodeSentMessagesToStatistic(msg);
- MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[]{cb}, alreadyReportedBytes, ctr, this);
+ MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[]{cb}, ctr, this);
long now = System.currentTimeMillis();
reportBackoffStatus(now);
int x = 0;
@@ -1562,7 +1562,7 @@
*/
public void sendSync(Message req, ByteCounter ctr) throws
NotConnectedException {
SyncMessageCallback cb = new SyncMessageCallback();
- sendAsync(req, cb, 0, ctr);
+ sendAsync(req, cb, ctr);
cb.waitForSend(60 * 1000);
if (!cb.done) {
Logger.error(this, "Waited too long for a blocking send
for " + req + " to " + PeerNode.this, new Exception("error"));
@@ -2119,12 +2119,12 @@
try {
if(isRealConnection())
- sendAsync(locMsg, null, 0,
node.nodeStats.initialMessagesCtr);
- sendAsync(ipMsg, null, 0,
node.nodeStats.initialMessagesCtr);
- sendAsync(timeMsg, null, 0,
node.nodeStats.initialMessagesCtr);
- sendAsync(packetsMsg, null, 0,
node.nodeStats.initialMessagesCtr);
- sendAsync(dRoutingMsg, null, 0,
node.nodeStats.initialMessagesCtr);
- sendAsync(uptimeMsg, null, 0,
node.nodeStats.initialMessagesCtr);
+ sendAsync(locMsg, null,
node.nodeStats.initialMessagesCtr);
+ sendAsync(ipMsg, null,
node.nodeStats.initialMessagesCtr);
+ sendAsync(timeMsg, null,
node.nodeStats.initialMessagesCtr);
+ sendAsync(packetsMsg, null,
node.nodeStats.initialMessagesCtr);
+ sendAsync(dRoutingMsg, null,
node.nodeStats.initialMessagesCtr);
+ sendAsync(uptimeMsg, null,
node.nodeStats.initialMessagesCtr);
} catch(NotConnectedException e) {
Logger.error(this, "Completed handshake with " +
getPeer() + " but disconnected (" + isConnected + ':' + currentTracker + "!!!:
" + e, e);
}
@@ -2163,7 +2163,7 @@
private void sendIPAddressMessage() {
Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);
try {
- sendAsync(ipMsg, null, 0, node.nodeStats.changedIPCtr);
+ sendAsync(ipMsg, null, node.nodeStats.changedIPCtr);
} catch(NotConnectedException e) {
Logger.normal(this, "Sending IP change message to " +
this + " but disconnected: " + e, e);
}
@@ -2478,7 +2478,7 @@
if(t < now || forceSendPrimary) {
try {
if(logMINOR) Logger.minor(this,
"Sending urgent notifications for current tracker on "+shortToString());
- int size =
outgoingMangler.processOutgoing(null, 0, 0, tracker, 0, DMT.PRIORITY_NOW);
+ int size =
outgoingMangler.processOutgoing(null, 0, 0, tracker, DMT.PRIORITY_NOW);
node.nodeStats.reportNotificationOnlyPacketSent(size);
} catch(NotConnectedException e) {
// Ignore
@@ -2495,7 +2495,7 @@
if(t < now)
try {
if(logMINOR) Logger.minor(this,
"Sending urgent notifications for previous tracker on "+shortToString());
- int size =
outgoingMangler.processOutgoing(null, 0, 0, tracker, 0, DMT.PRIORITY_NOW);
+ int size =
outgoingMangler.processOutgoing(null, 0, 0, tracker, DMT.PRIORITY_NOW);
node.nodeStats.reportNotificationOnlyPacketSent(size);
} catch(NotConnectedException e) {
// Ignore
@@ -2712,7 +2712,7 @@
Logger.error(this, "No tracker to resend packet
" + item.packetNumber + " on");
continue;
}
- MessageItem mi = new MessageItem(item.buf,
item.callbacks, true, 0, resendByteCounter, item.priority);
+ MessageItem mi = new MessageItem(item.buf,
item.callbacks, true, resendByteCounter, item.priority);
requeueMessageItems(new MessageItem[]{mi}, 0, 1, true);
}
}
@@ -3431,7 +3431,7 @@
byte[] authenticator =
HMAC.macWithSHA256(node.failureTable.offerAuthenticatorKey, keyBytes, 32);
Message msg = DMT.createFNPOfferKey(key, authenticator);
try {
- sendAsync(msg, null, 0, node.nodeStats.sendOffersCtr);
+ sendAsync(msg, null, node.nodeStats.sendOffersCtr);
} catch(NotConnectedException e) {
// Ignore
}
@@ -3873,7 +3873,7 @@
n2nm = DMT.createNodeToNodeMessage(
n2nType,
fs.toString().getBytes("UTF-8"));
try {
- sendAsync(n2nm, null, 0,
node.nodeStats.nodeToNodeCounter);
+ sendAsync(n2nm, null,
node.nodeStats.nodeToNodeCounter);
} catch (NotConnectedException e) {
if(includeSentTime) {
fs.removeValue("sentTime");
@@ -3950,7 +3950,7 @@
void sendFNPNetworkID(ByteCounter ctr) throws NotConnectedException {
if (assignedNetworkID!=0)
- sendAsync(DMT.createFNPNetworkID(assignedNetworkID),
null, 0, ctr);
+ sendAsync(DMT.createFNPNetworkID(assignedNetworkID),
null, ctr);
}
public boolean shouldThrottle() {
@@ -3997,7 +3997,7 @@
if(logMINOR) Logger.minor(this, "Sending throttled message with
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
for(int i=0;i<100;i++) {
try {
- getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr, deadline, blockForSend);
+ getThrottle().sendThrottledMessage(msg, this,
packetSize, ctr, deadline, blockForSend);
return;
} catch (ThrottleDeprecatedException e) {
// Try with the new throttle. We don't need it,
we'll get it from getThrottle().
@@ -4062,7 +4062,7 @@
* @param rpiTemp
* @param rpiTemp
*/
- public void maybeSendPacket(long now, Vector rpiTemp, int[] rpiIntTemp)
{
+ public boolean maybeSendPacket(long now, Vector rpiTemp, int[]
rpiIntTemp) {
// If there are any urgent notifications, we must send a packet.
boolean mustSend = false;
if(mustSendNotificationsNow(now)) {
@@ -4091,23 +4091,23 @@
if(logMINOR)
Logger.minor(this, "Resending "
+ item.packetNumber + " to " + item.kt);
getOutgoingMangler().resend(item);
- return;
+ return true;
} catch(KeyChangedException e) {
Logger.error(this, "Caught " + e + "
resending packets to " + kt);
requeueResendItems(rpiTemp);
- return;
+ return false;
} catch(NotConnectedException e) {
Logger.normal(this, "Caught " + e + "
resending packets to " + kt);
requeueResendItems(rpiTemp);
- return;
+ return false;
} catch(PacketSequenceException e) {
Logger.error(this, "Caught " + e + " -
disconnecting", e);
// PSE is fairly drastic, something is
broken between us, but maybe we can resync
forceDisconnect(false);
- return;
+ return false;
} catch(WouldBlockException e) {
Logger.error(this, "Impossible: " + e,
e);
- return;
+ return false;
}
}
@@ -4233,15 +4233,16 @@
// Force packet to have a sequence number.
Message m = DMT.createFNPVoid();
addToLocalNodeSentMessagesToStatistic(m);
- messages.add(new MessageItem(m, null, 0, null, this));
+ messages.add(new MessageItem(m, null, null, this));
}
- if(messages.isEmpty()) return;
+ if(messages.isEmpty()) return false;
// Send packets, right now, blocking, including any active
notifications
// Note that processOutgoingOrRequeue will drop messages from
the end
// if necessary to fit the messages into a single packet.
getOutgoingMangler().processOutgoingOrRequeue(messages.toArray(new
MessageItem[messages.size()]), this, true, false, true);
-
+
+ return true;
}
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -136,7 +136,7 @@
Logger.minor(this, "Handling a request: " + uid);
Message accepted = DMT.createFNPAccepted(uid);
- source.sendAsync(accepted, null, 0, this);
+ source.sendAsync(accepted, null, this);
Object o = node.makeRequestSender(key, htl, uid, source, false,
true, false, false);
if(o instanceof KeyBlock) {
@@ -168,7 +168,7 @@
// Forward RejectedOverload
//Note: This message is only decernable from
the terminal messages by the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
Message msg =
DMT.createFNPRejectedOverload(uid, false);
- source.sendAsync(msg, null, 0, this);
+ source.sendAsync(msg, null, this);
//If the status changes (e.g. to SUCCESS),
there is little need to send yet another reject overload.
sentRejectedOverload = true;
}
@@ -182,7 +182,7 @@
try {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid,
rs.getHeaders());
- source.sendAsync(df, null, 0, this);
+ source.sendAsync(df, null, this);
PartiallyReceivedBlock prb = rs.getPRB();
bt =
@@ -339,7 +339,7 @@
// SUCCESS requires that BOTH the pubkey AND the data/headers
have been received.
// The pubKey will have been set on the SSK key, and the
SSKBlock will have been constructed.
Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid,
headers);
- source.sendAsync(headersMsg, null, 0, this);
+ source.sendAsync(headersMsg, null, this);
final Message dataMsg = DMT.createFNPSSKDataFoundData(uid,
data);
node.executor.execute(new PrioRunnable() {
@@ -366,13 +366,13 @@
if(SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid, headers,
data);
- source.sendAsync(df, null, 0, this);
+ source.sendAsync(df, null, this);
// Not throttled, so report payload here.
sentPayload(data.length);
}
if(needsPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid, pubKey);
- source.sendAsync(pk, null, 0, this);
+ source.sendAsync(pk, null, this);
}
}
@@ -380,7 +380,7 @@
// SUCCESS requires that BOTH the pubkey AND the data/headers
have been received.
// The pubKey will have been set on the SSK key, and the
SSKBlock will have been constructed.
Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid,
headers);
- source.sendAsync(headersMsg, null, 0, ctr);
+ source.sendAsync(headersMsg, null, ctr);
final Message dataMsg = DMT.createFNPSSKDataFoundData(uid,
data);
try {
source.sendThrottledMessage(dataMsg, data.length, ctr,
60 * 1000, false);
@@ -391,13 +391,13 @@
if(SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid, headers,
data);
- source.sendAsync(df, null, 0, ctr);
+ source.sendAsync(df, null, ctr);
// Not throttled, so report payload here.
ctr.sentPayload(data.length);
}
if(needsPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid, pubKey);
- source.sendAsync(pk, null, 0, ctr);
+ source.sendAsync(pk, null, ctr);
}
}
@@ -417,7 +417,7 @@
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid,
prb, this);
node.addTransferringRequestHandler(uid);
- source.sendAsync(df, null, 0, this);
+ source.sendAsync(df, null, this);
if(bt.send(node.executor)) {
// for byte logging
status = RequestSender.SUCCESS;
@@ -451,7 +451,7 @@
else
sendTerminalCalled = true;
- source.sendAsync(msg, new TerminalMessageByteCountCollector(),
0, this);
+ source.sendAsync(msg, new TerminalMessageByteCountCollector(),
this);
}
boolean sendTerminalCalled = false;
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -209,7 +209,7 @@
}
Message msg = DMT.createFNPGetOfferedKey(key,
offer.authenticator, pubKey == null, uid);
try {
- pn.sendAsync(msg, null, 0, this);
+ pn.sendAsync(msg, null, this);
} catch (NotConnectedException e2) {
if(logMINOR)
Logger.minor(this, "Disconnected:
"+pn+" getting offer for "+key);
Modified: trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
2008-09-19 17:39:53 UTC (rev 22708)
@@ -43,8 +43,8 @@
Message accepted = DMT.createFNPAccepted(uid);
Message trace = DMT.createFNPRHProbeTrace(uid,
sender.getNearestLoc(), sender.getBest(), htl, (short)1, (short)1,
n.getLocation(), n.swapIdentifier, LocationManager.extractLocs(peers, true),
LocationManager.extractUIDs(peers), (short)0, (short)1, "",
source.swapIdentifier);
try {
- source.sendAsync(accepted, null, 0, sender);
- source.sendAsync(trace, null, 0, sender);
+ source.sendAsync(accepted, null, sender);
+ source.sendAsync(trace, null, sender);
} catch (NotConnectedException e) {
// We completed(id), rather than locking it, so we
don't need to unlock.
return; // So all we need to do is not start the sender.
@@ -53,33 +53,33 @@
}
public void onCompletion(double nearest, double best, short counter,
short uniqueCounter, short linearCounter) throws NotConnectedException {
- source.sendAsync(DMT.createFNPRHProbeReply(uid, nearest, best,
counter, uniqueCounter, linearCounter), null, 0, sender);
+ source.sendAsync(DMT.createFNPRHProbeReply(uid, nearest, best,
counter, uniqueCounter, linearCounter), null, sender);
}
public void onRNF(short htl, double nearest, double best, short
counter, short uniqueCounter, short linearCounter) throws NotConnectedException
{
Message rnf = DMT.createFNPRouteNotFound(uid, htl);
Message sub = DMT.createFNPRHReturnSubMessage(nearest, best,
counter, uniqueCounter, linearCounter, "rnf");
rnf.addSubMessage(sub);
- source.sendAsync(rnf, null, 0, sender);
+ source.sendAsync(rnf, null, sender);
}
public void onReceivedRejectOverload(double nearest, double best, short
counter, short uniqueCounter, short linearCounter, String reason) throws
NotConnectedException {
Message ro = DMT.createFNPRejectedOverload(uid, false);
Message sub = DMT.createFNPRHReturnSubMessage(nearest, best,
counter, uniqueCounter, linearCounter, reason);
ro.addSubMessage(sub);
- source.sendAsync(ro, null, 0, sender);
+ source.sendAsync(ro, null, sender);
}
public void onTimeout(double nearest, double best, short counter, short
uniqueCounter, short linearCounter, String reason) throws NotConnectedException
{
Message ro = DMT.createFNPRejectedOverload(uid, true);
Message sub = DMT.createFNPRHReturnSubMessage(nearest, best,
counter, uniqueCounter, linearCounter, reason);
ro.addSubMessage(sub);
- source.sendAsync(ro, null, 0, sender);
+ source.sendAsync(ro, null, sender);
}
public void onTrace(long uid, double nearest, double best, short htl,
short counter, short uniqueCounter, double location, long myUID, ShortBuffer
peerLocs, ShortBuffer peerUIDs, short forkCount, short linearCounter, String
reason, long prevUID) throws NotConnectedException {
Message trace = DMT.createFNPRHProbeTrace(uid, nearest, best,
htl, counter, uniqueCounter, location, myUID, peerLocs, peerUIDs, forkCount,
linearCounter, reason, prevUID);
- source.sendAsync(trace, null, 0, sender);
+ source.sendAsync(trace, null, sender);
}
}
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-09-19
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-09-19
17:39:53 UTC (rev 22708)
@@ -86,7 +86,7 @@
Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);
try {
- source.sendAsync(accepted, null, 0, this);
+ source.sendAsync(accepted, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Lost connection to
source");
return;
@@ -141,7 +141,7 @@
if(logMINOR) Logger.minor(this, "Got
pubkey on "+uid+" : "+pubKey);
Message confirm =
DMT.createFNPSSKPubKeyAccepted(uid);
try {
- source.sendAsync(confirm, null,
0, this);
+ source.sendAsync(confirm, null,
this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this,
"Lost connection to source on "+uid);
return;
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-09-19 17:39:53 UTC
(rev 22708)
@@ -170,9 +170,9 @@
// Send to next node
try {
- next.sendAsync(request, null, 0, this);
+ next.sendAsync(request, null, this);
if(RequestHandler.SEND_OLD_FORMAT_SSK) {
- next.sendAsync(req, null, 0, this);
+ next.sendAsync(req, null, this);
// Not throttled, so report here.
sentPayload(data.length);
}
@@ -248,7 +248,7 @@
Message dataMsg = DMT.createFNPSSKInsertRequestData(uid, data);
try {
- next.sendAsync(headersMsg, null, 0, this);
+ next.sendAsync(headersMsg, null, this);
next.sendThrottledMessage(dataMsg, data.length,
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected
to "+next);
@@ -265,7 +265,7 @@
if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey);
try {
- next.sendAsync(pkMsg, null, 0, this);
+ next.sendAsync(pkMsg, null, this);
} catch (NotConnectedException e) {
if(logMINOR) Logger.minor(this, "Node disconnected
while sending pubkey: "+next);
continue;
Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
2008-09-19 17:39:53 UTC (rev 22708)
@@ -44,7 +44,7 @@
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Disconnect trigger: "+this);
try {
- dest.sendAsync(msg, null, 0, ctr);
+ dest.sendAsync(msg, null, ctr);
} catch (NotConnectedException e) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Both source and destination
disconnected: "+msg+" for "+this);
Modified: trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
2008-09-19 17:39:53 UTC (rev 22708)
@@ -192,7 +192,7 @@
if((!hasBeenBlown) && (mainUpdater == null ||
mainUpdater.getFetchedVersion() <= 0)) return;
}
try {
- peer.sendAsync(getUOMAnnouncement(), null, 0, ctr);
+ peer.sendAsync(getUOMAnnouncement(), null, ctr);
} catch (NotConnectedException e) {
// Sad, but ignore it
}
Modified: trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
2008-09-19 17:39:53 UTC (rev 22708)
@@ -181,7 +181,7 @@
public void sent() {
// Cool
}
- }, 0, updateManager.ctr);
+ }, updateManager.ctr);
// The reply message will start the
transfer. It includes the revocation URI
// so we can tell if anything wierd is
happening.
@@ -340,7 +340,7 @@
}
}, REQUEST_MAIN_JAR_TIMEOUT);
}
- }, 0, updateManager.ctr);
+ }, updateManager.ctr);
} catch (NotConnectedException e) {
synchronized(this) {
nodesAskedSendMainJar.remove(source);
@@ -616,7 +616,7 @@
return super.toString() +
"("+uid+":"+source.getPeer()+")";
}
- }, 0, updateManager.ctr);
+ }, updateManager.ctr);
} catch (NotConnectedException e) {
Logger.error(this, "Peer "+source+" asked us for the
blob file for the revocation key, then disconnected when we tried to send the
UOMSendingRevocation: "+e, e);
return true;
@@ -898,7 +898,7 @@
private void cancelSend(PeerNode source, long uid) {
Message msg = DMT.createFNPBulkReceiveAborted(uid);
try {
- source.sendAsync(msg, null, 0, updateManager.ctr);
+ source.sendAsync(msg, null, updateManager.ctr);
} catch (NotConnectedException e1) {
// Ignore
}
@@ -991,7 +991,7 @@
return super.toString() +
"("+uid+":"+source.getPeer()+")";
}
- }, 0, updateManager.ctr);
+ }, updateManager.ctr);
} catch (NotConnectedException e) {
Logger.error(this, "Peer "+source+" asked us for the
blob file for the main jar, then disconnected when we tried to send the
UOMSendingMain: "+e, e);
return true;