Author: robert
Date: 2007-12-06 20:24:08 +0000 (Thu, 06 Dec 2007)
New Revision: 16372
Modified:
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/RequestHandler.java
Log:
Don't hang onto a RequestHandler thread just to track the bytes.
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2007-12-06 19:40:42 UTC
(rev 16371)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2007-12-06 20:24:08 UTC
(rev 16372)
@@ -54,6 +54,7 @@
}
public void onSent(int length) {
+ //NB: The fact that the bytes are counted before callback
notifications is important for load management.
if(ctrCallback != null) {
try {
ctrCallback.sentBytes(length);
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2007-12-06 19:40:42 UTC
(rev 16371)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2007-12-06 20:24:08 UTC
(rev 16372)
@@ -4,6 +4,7 @@
package freenet.node;
import freenet.crypt.DSAPublicKey;
+import freenet.io.comm.AsyncMessageCallback;
import freenet.io.comm.ByteCounter;
import freenet.io.comm.DMT;
import freenet.io.comm.Message;
@@ -82,8 +83,12 @@
} finally {
node.removeTransferringRequestHandler(uid);
node.unlockUID(uid, key instanceof NodeSSK, false, false);
- if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
- && status != RequestSender.INTERNAL_ERROR && !thrown) {
+ }
+ }
+
+ private void applyByteCounts() {
+ if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
+ && status != RequestSender.INTERNAL_ERROR) {
int sent, rcvd;
synchronized(bytesSync) {
sent = sentBytes;
@@ -99,6 +104,7 @@
// Can report both parts, because we had both a
Handler and a Sender
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(rcvd);
+ node.sentPayload(rs.getSSKData().length); // won't be
sentPayload()ed by BlockTransmitter
}
} else {
if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
@@ -111,8 +117,6 @@
}
}
}
-
- }
}
private void realRun() throws NotConnectedException {
@@ -127,13 +131,13 @@
if(o instanceof KeyBlock) {
KeyBlock block = (KeyBlock) o;
Message df = createDataFound(block);
- source.sendSync(df, null);
+ source.sendAsync(df, null, 0, this);
if(key instanceof NodeSSK) {
if(needsPubKey) {
DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
Message pk = DMT.createFNPSSKPubKey(uid, key);
if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' '
+key.toLongString());
- source.sendSync(pk, null);
+ sendTerminal(pk);
}
status = RequestSender.SUCCESS; // for byte logging
}
@@ -144,11 +148,14 @@
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
node.addTransferringRequestHandler(uid);
if(bt.send(node.executor)) {
- status = RequestSender.SUCCESS; // for byte logging
+ // for byte logging
+ status = RequestSender.SUCCESS;
// We've fetched it from our datastore, so there won't
be a downstream noderef.
// But we want to send at least an
FNPOpennetCompletedAck, otherwise the request source
// may have to timeout waiting for one.
finishOpennetNoRelay();
+ //also for byte logging, since the block is the 'terminal'
message.
+ applyByteCounts();
}
}
return;
@@ -157,8 +164,8 @@
if(rs == null) { // ran out of htl?
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendSync(dnf, null);
status = RequestSender.DATA_NOT_FOUND; // for byte logging
+ sendTerminal(dnf);
return;
}
@@ -172,13 +179,14 @@
if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
// Forward RejectedOverload
Message msg = DMT.createFNPRejectedOverload(uid, false);
- source.sendAsync(msg, null, 0, null);
+ source.sendAsync(msg, null, 0, this);
}
if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
- source.sendSync(df, null);
+ source.sendAsync(df, null, 0, this);
+
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
@@ -190,6 +198,8 @@
finishOpennetChecked();
}
status = rs.getStatus();
+ //for byte logging, since the block is the 'terminal' message.
+ applyByteCounts();
return;
}
@@ -201,11 +211,11 @@
case RequestSender.NOT_FINISHED:
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendSync(dnf, this);
+ sendTerminal(dnf);
return;
case RequestSender.RECENTLY_FAILED:
Message rf = DMT.createFNPRecentlyFailed(uid,
rs.getRecentlyFailedTimeLeft());
- source.sendSync(rf, this);
+ sendTerminal(rf);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -213,21 +223,22 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendSync(reject, this);
+ sendTerminal(reject);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.sendSync(rnf, this);
+ sendTerminal(rnf);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
Message df = DMT.createFNPSSKDataFound(uid,
rs.getHeaders(), rs.getSSKData());
- source.sendSync(df, this);
- node.sentPayload(rs.getSSKData().length); // won't be
sentPayload()ed by BlockTransmitter
if(needsPubKey) {
+ source.sendAsync(df, null, 0, this);
Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey());
- source.sendSync(pk, this);
+ sendTerminal(pk);
+ } else {
+ sendTerminal(df);
}
return;
} else {
@@ -244,7 +255,7 @@
continue; // should have started transfer
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendSync(reject, this);
+ sendTerminal(reject);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
@@ -262,6 +273,49 @@
}
/**
+ * Sends the 'final' packet of a request in such a way that the thread can
be freed (made non-runnable/exit)
+ * and the byte counter will still be accurate.
+ */
+ private void sendTerminal(Message msg) throws NotConnectedException {
+ if (sendTerminalCalled)
+ throw new IllegalStateException("sendTerminal should only be
called once");
+ else
+ sendTerminalCalled=true;
+
+ source.sendAsync(msg, new TerminalMessageByteCountCollector(), 0,
this);
+ }
+
+ boolean sendTerminalCalled=false;
+
+ /**
+ * Note well! These functions are not executed on the RequestHandler
thread.
+ */
+ private class TerminalMessageByteCountCollector implements
AsyncMessageCallback {
+
+ public void acknowledged() {
+ //terminalMessage ack'd by remote peer
+ }
+
+ public void disconnected() {
+ Logger.minor(this, "Peer disconnected before terminal message sent
for "+RequestHandler.this);
+ }
+
+ public void fatalError() {
+ Logger.error(this, "Error sending terminal message?!
for " + RequestHandler.this);
+ }
+
+ private boolean once=true;
+ public void sent() {
+ //For byte counting, this relies on the fact that the callback
will only be excuted once. This check might be paranoid.
+ if (once) {
+ applyByteCounts();
+ } else {
+ Logger.error(this, "terminalMessage sent multiple times? for "
+ RequestHandler.this);
+ }
+ }
+ }
+
+ /**
* Either send an ack, indicating we've finished and aren't interested in
opennet,
* or wait for a noderef and relay it and wait for a response and relay
that,
* or send our own noderef and wait for a response and add that.
@@ -406,6 +460,7 @@
if(block instanceof CHKBlock)
return DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
else if(block instanceof SSKBlock) {
+ // FIXME called before payload is actually sent
node.sentPayload(block.getRawData().length); // won't
be sentPayload()ed by BlockTransmitter
return DMT.createFNPSSKDataFound(uid,
block.getRawHeaders(), block.getRawData());
} else
@@ -431,5 +486,5 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
-
+
}