Author: toad
Date: 2006-07-10 23:35:54 +0000 (Mon, 10 Jul 2006)
New Revision: 9553
Modified:
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/node/Version.java
Log:
871: Take into account bandwidth used by receiving requests/inserts as well as
sending them when determining the bandwidth cost of a request.
Becomes mandatory at 0:00 GMT on Monday the 18th.
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-10 23:27:27 UTC
(rev 9552)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-10 23:35:54 UTC
(rev 9553)
@@ -188,6 +188,8 @@
static final int GENERATED_REJECTED_OVERLOAD = 5;
/** Could not get off the node at all! */
static final int ROUTE_REALLY_NOT_FOUND = 6;
+ /** Receive failed. Not used internally; only used by CHKInsertHandler. */
+ static final int RECEIVE_FAILED = 7;
public String toString() {
return super.toString()+" for "+uid;
@@ -554,15 +556,6 @@
notifyAll();
}
- if((code != TIMED_OUT) && (code != GENERATED_REJECTED_OVERLOAD) &&
(code != INTERNAL_ERROR)
- && (code != ROUTE_REALLY_NOT_FOUND)) {
- Logger.minor(this, "CHK insert cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
- (source == null ? node.localChkInsertBytesSentAverage :
node.remoteChkInsertBytesSentAverage)
- .report(getTotalSentBytes());
- (source == null ? node.localChkInsertBytesReceivedAverage :
node.remoteChkInsertBytesReceivedAverage)
- .report(getTotalReceivedBytes());
- }
-
Logger.minor(this, "Returning from finish()");
}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-10 23:27:27 UTC
(rev 9552)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2006-07-10 23:35:54 UTC
(rev 9553)
@@ -22,7 +22,7 @@
* Handle an incoming insert request.
* This corresponds to RequestHandler.
*/
-public class InsertHandler implements Runnable {
+public class InsertHandler implements Runnable, ByteCounter {
static final int DATA_INSERT_TIMEOUT = 10000;
@@ -80,7 +80,7 @@
// Send Accepted
Message accepted = DMT.createFNPAccepted(uid);
try {
- source.send(accepted, null);
+ source.send(accepted, this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection to source");
return;
@@ -93,7 +93,7 @@
Message msg;
try {
- msg = node.usm.waitFor(mf, null);
+ msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected while waiting for DataInsert on
"+uid);
return;
@@ -106,11 +106,11 @@
if(source.isConnected() && (startTime >
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4)))
Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null, 0, null);
+ source.sendAsync(tooSlow, null, 0, this);
Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
- source.sendAsync(m, null, 0, null);
+ source.sendAsync(m, null, 0, this);
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- br = new BlockReceiver(node.usm, source, uid, prb,
null);
+ br = new BlockReceiver(node.usm, source, uid, prb,
this);
prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
return;
@@ -132,7 +132,7 @@
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
if(htl > 0)
sender = node.makeInsertSender(key, htl, uid, source, headers,
prb, false, closestLoc, true);
- br = new BlockReceiver(node.usm, source, uid, prb, null);
+ br = new BlockReceiver(node.usm, source, uid, prb, this);
// Receive the data, off thread
@@ -146,11 +146,11 @@
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
// Ignore
}
- finish();
+ finish(CHKInsertSender.SUCCESS);
return;
}
@@ -186,7 +186,7 @@
// Cancel the sender
sender.receiveFailed(); // tell it to stop if it hasn't
already failed... unless it's sending from store
// Nothing else we can do
- finish();
+ finish(CHKInsertSender.RECEIVE_FAILED);
return;
}
@@ -195,7 +195,7 @@
// Forward it
Message m = DMT.createFNPRejectedOverload(uid, false);
try {
- source.send(m, null);
+ source.send(m, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -225,7 +225,7 @@
(status == CHKInsertSender.INTERNAL_ERROR)) {
msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -234,20 +234,20 @@
if((status == CHKInsertSender.TIMED_OUT) ||
(status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD))
canCommit = true;
- finish();
+ finish(status);
return;
}
if((status == CHKInsertSender.ROUTE_NOT_FOUND) || (status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND)) {
msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
}
canCommit = true;
- finish();
+ finish(status);
return;
}
@@ -255,13 +255,13 @@
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
}
canCommit = true;
- finish();
+ finish(status);
return;
}
@@ -269,11 +269,11 @@
Logger.error(this, "Unknown status code:
"+sender.getStatusString());
msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
// Ignore
}
- finish();
+ finish(CHKInsertSender.INTERNAL_ERROR);
return;
}
}
@@ -286,7 +286,7 @@
* If canCommit, and we have received all the data, and it
* verifies, then commit it.
*/
- private void finish() {
+ private void finish(int code) {
Logger.minor(this, "Finishing");
maybeCommit();
@@ -314,13 +314,27 @@
boolean failed = sender.anyTransfersFailed();
Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
try {
- source.sendAsync(m, null, 0, null);
+ source.send(m, this);
Logger.minor(this, "Sent completion: "+failed+" for
"+this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Not connected: "+source+" for
"+this);
// May need to commit anyway...
}
}
+
+ if(code != CHKInsertSender.TIMED_OUT && code !=
CHKInsertSender.GENERATED_REJECTED_OVERLOAD &&
+ code != CHKInsertSender.INTERNAL_ERROR && code !=
CHKInsertSender.ROUTE_REALLY_NOT_FOUND &&
+ code != CHKInsertSender.RECEIVE_FAILED) {
+ int totalSent = getTotalSentBytes();
+ int totalReceived = getTotalReceivedBytes();
+ if(sender != null) {
+ totalSent += sender.getTotalSentBytes();
+ totalReceived += sender.getTotalReceivedBytes();
+ }
+ Logger.minor(this, "Remote CHK insert cost
"+totalSent+"/"+totalReceived+" bytes ("+code+")");
+ node.remoteChkInsertBytesSentAverage.report(totalSent);
+ node.remoteChkInsertBytesReceivedAverage.report(totalReceived);
+ }
}
private void maybeCommit() {
@@ -344,7 +358,7 @@
}
if(toSend != null) {
try {
- source.sendAsync(toSend, null, 0, null);
+ source.sendAsync(toSend, null, 0, this);
} catch (NotConnectedException e) {
// :(
Logger.minor(this, "Lost connection in "+this+" when sending
FNPDataInsertRejected");
@@ -368,7 +382,7 @@
runThread.interrupt();
Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
try {
- source.send(msg, null);
+ source.send(msg, InsertHandler.this);
} catch (NotConnectedException ex) {
Logger.error(this, "Can't send "+msg+" to "+source+":
"+ex);
}
@@ -385,4 +399,27 @@
}
+ private final Object totalSync = new Object();
+ private int totalSentBytes;
+ private int totalReceivedBytes;
+
+ public void sentBytes(int x) {
+ synchronized(totalSync) {
+ totalSentBytes += x;
+ }
+ }
+
+ public void receivedBytes(int x) {
+ synchronized(totalSync) {
+ totalReceivedBytes += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ return totalSentBytes;
+ }
+
+ public int getTotalReceivedBytes() {
+ return totalReceivedBytes;
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-07-10 23:27:27 UTC (rev
9552)
+++ trunk/freenet/src/freenet/node/Node.java 2006-07-10 23:35:54 UTC (rev
9553)
@@ -1585,23 +1585,24 @@
// Select the request scheduler
- localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
- localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ // Guesstimates. Hopefully well over the reality.
+ localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
+ localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
localChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
localSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
localSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
- localChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
- localSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ localChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024, 180000, 0.0, Long.MAX_VALUE);
+ localSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
- remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
- remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
- remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
- remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
- remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
- remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
- remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
- remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+ remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
// FIXME make all the below arbitrary constants configurable!
@@ -1882,6 +1883,12 @@
if(status == RequestSender.NOT_FINISHED)
continue;
+ if(status != RequestSender.TIMED_OUT && status !=
RequestSender.GENERATED_REJECTED_OVERLOAD && status !=
RequestSender.INTERNAL_ERROR) {
+ Logger.minor(this, "CHK fetch cost
"+rs.getTotalSentBytes()+"/"+rs.getTotalReceivedBytes()+" bytes ("+status+")");
+ localChkFetchBytesSentAverage.report(rs.getTotalSentBytes());
+
localChkFetchBytesReceivedAverage.report(rs.getTotalReceivedBytes());
+ }
+
if((status == RequestSender.TIMED_OUT) ||
(status ==
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
if(!rejectedOverload) {
@@ -1970,6 +1977,12 @@
if(status == RequestSender.NOT_FINISHED)
continue;
+
+ if(status != RequestSender.TIMED_OUT && status !=
RequestSender.GENERATED_REJECTED_OVERLOAD && status !=
RequestSender.INTERNAL_ERROR) {
+ Logger.minor(this, "SSK fetch cost
"+rs.getTotalSentBytes()+"/"+rs.getTotalReceivedBytes()+" bytes ("+status+")");
+ localSskFetchBytesSentAverage.report(rs.getTotalSentBytes());
+
localSskFetchBytesReceivedAverage.report(rs.getTotalReceivedBytes());
+ }
if((status == RequestSender.TIMED_OUT) ||
(status ==
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
@@ -2106,11 +2119,20 @@
}
}
- if(is.getStatus() == CHKInsertSender.SUCCESS) {
+ int status = is.getStatus();
+ if(status != CHKInsertSender.TIMED_OUT && status !=
CHKInsertSender.GENERATED_REJECTED_OVERLOAD && status !=
CHKInsertSender.INTERNAL_ERROR
+ && status != CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+ int sent = is.getTotalSentBytes();
+ int received = is.getTotalReceivedBytes();
+ Logger.minor(this, "Local CHK insert cost "+sent+"/"+received+"
bytes ("+status+")");
+ localChkInsertBytesSentAverage.report(sent);
+ localChkInsertBytesReceivedAverage.report(received);
+ }
+
+ if(status == CHKInsertSender.SUCCESS) {
Logger.normal(this, "Succeeded inserting "+block);
return;
} else {
- int status = is.getStatus();
String msg = "Failed inserting "+block+" :
"+is.getStatusString();
if(status == CHKInsertSender.ROUTE_NOT_FOUND)
msg += " - this is normal on small networks;
the data will still be propagated, but it can't find the 20+ nodes needed for
full success";
@@ -2203,6 +2225,17 @@
}
}
+ int status = is.getStatus();
+
+ if(status != CHKInsertSender.TIMED_OUT && status !=
CHKInsertSender.GENERATED_REJECTED_OVERLOAD && status !=
CHKInsertSender.INTERNAL_ERROR
+ && status != CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+ int sent = is.getTotalSentBytes();
+ int received = is.getTotalReceivedBytes();
+ Logger.minor(this, "Local SSK insert cost "+sent+"/"+received+"
bytes ("+status+")");
+ localSskInsertBytesSentAverage.report(sent);
+ localSskInsertBytesReceivedAverage.report(received);
+ }
+
if(is.hasCollided()) {
// Store it locally so it can be fetched immediately,
and overwrites any locally inserted.
try {
@@ -2215,11 +2248,10 @@
throw new
LowLevelPutException(LowLevelPutException.COLLISION);
}
- if(is.getStatus() == SSKInsertSender.SUCCESS) {
+ if(status == SSKInsertSender.SUCCESS) {
Logger.normal(this, "Succeeded inserting "+block);
return;
} else {
- int status = is.getStatus();
String msg = "Failed inserting "+block+" :
"+is.getStatusString();
if(status == CHKInsertSender.ROUTE_NOT_FOUND)
msg += " - this is normal on small networks;
the data will still be propagated, but it can't find the 20+ nodes needed for
full success";
@@ -2266,13 +2298,16 @@
double expected =
(isInsert ? (isSSK ?
this.remoteSskInsertBytesSentAverage : this.remoteChkInsertBytesSentAverage)
: (isSSK ?
this.remoteSskFetchBytesSentAverage :
this.remoteChkFetchBytesSentAverage)).currentValue();
- int e = (int)Math.max(expected, 0);
- if(!requestOutputThrottle.instantGrab(e)) return "Insufficient
output bandwidth";
+ int expectedSent = (int)Math.max(expected, 0);
+ if(!requestOutputThrottle.instantGrab(expectedSent)) return
"Insufficient output bandwidth";
expected =
(isInsert ? (isSSK ?
this.remoteSskInsertBytesReceivedAverage :
this.remoteChkInsertBytesReceivedAverage)
: (isSSK ?
this.remoteSskFetchBytesReceivedAverage :
this.remoteChkFetchBytesReceivedAverage)).currentValue();
- e = (int)Math.max(expected, 0);
- if(!requestInputThrottle.instantGrab(e)) return "Insufficient
input bandwidth";
+ int expectedReceived = (int)Math.max(expected, 0);
+ if(!requestInputThrottle.instantGrab(expectedReceived)) {
+ requestOutputThrottle.recycle(expectedSent);
+ return "Insufficient input bandwidth";
+ }
// If no recent reports, no packets have been sent; correct the
average downwards.
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-10 23:27:27 UTC
(rev 9552)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2006-07-10 23:35:54 UTC
(rev 9553)
@@ -18,7 +18,7 @@
* is separated off into RequestSender so we get transfer coalescing
* and both ends for free.
*/
-public class RequestHandler implements Runnable {
+public class RequestHandler implements Runnable, ByteCounter {
final Message req;
final Node node;
@@ -28,6 +28,7 @@
private double closestLoc;
private boolean needsPubKey;
final Key key;
+ private boolean finalTransferFailed = false;
public String toString() {
return super.toString()+" for "+uid;
@@ -51,6 +52,8 @@
}
public void run() {
+ int status = RequestSender.NOT_FINISHED;
+ RequestSender rs = null;
try {
Logger.minor(this, "Handling a request: "+uid);
htl = source.decrementHTL(htl);
@@ -70,21 +73,24 @@
Logger.minor(this, "Sending PK: "+key+"
"+key.writeAsField());
source.send(pk, null);
}
+ status = RequestSender.SUCCESS; // for byte logging
}
if(block instanceof CHKBlock) {
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, null);
- bt.send();
+ if(bt.send())
+ status = RequestSender.SUCCESS; // for byte logging
}
return;
}
- RequestSender rs = (RequestSender) o;
+ rs = (RequestSender) o;
if(rs == null) { // ran out of htl?
Message dnf = DMT.createFNPDataNotFound(uid);
source.send(dnf, null);
+ status = RequestSender.DATA_NOT_FOUND; // for byte logging
return;
}
@@ -105,18 +111,20 @@
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, null);
- bt.send(); // either fails or succeeds; other side will see, we
don't care
+ if(!bt.send());
+ finalTransferFailed = true;
return;
}
- int status = rs.getStatus();
+ status = rs.getStatus();
+
+ if(status == RequestSender.NOT_FINISHED) continue;
switch(status) {
case RequestSender.NOT_FINISHED:
- continue;
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendAsync(dnf, null, 0, null);
+ source.send(dnf, this);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -124,19 +132,20 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0, null);
+ source.send(reject, this);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.sendAsync(rnf, null, 0, null);
+ source.send(rnf, this);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
Message df = DMT.createFNPSSKDataFound(uid,
rs.getHeaders(), rs.getSSKData());
- source.send(df, null);
+ source.send(df, this);
if(needsPubKey) {
- source.send(df, null);
+ Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
+ source.send(pk, this);
}
} else if(!rs.transferStarted()) {
Logger.error(this, "Status is SUCCESS but we
never started a transfer on "+uid);
@@ -150,7 +159,7 @@
continue; // should have started transfer
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0, null);
+ source.send(reject, this);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
@@ -169,6 +178,21 @@
Logger.error(this, "Caught "+t, t);
} finally {
node.unlockUID(uid);
+ if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
+ && status != RequestSender.INTERNAL_ERROR) {
+ int sent = rs.getTotalSentBytes() + sentBytes;
+ int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+ if(key instanceof NodeSSK) {
+ Logger.minor(this, "Remote SSK fetch cost
"+sent+"/"+rcvd+" bytes ("+status+")");
+ node.remoteSskFetchBytesSentAverage.report(sent);
+ node.remoteSskFetchBytesReceivedAverage.report(rcvd);
+ } else {
+ Logger.minor(this, "Remote CHK fetch cost
"+sent+"/"+rcvd+" bytes ("+status+")");
+ node.remoteChkFetchBytesSentAverage.report(sent);
+ node.remoteChkFetchBytesReceivedAverage.report(rcvd);
+ }
+ }
+
}
}
@@ -181,4 +205,20 @@
throw new IllegalStateException("Unknown key block
type: "+block.getClass());
}
+ private int sentBytes;
+ private int receivedBytes;
+ private final Object bytesSync = new Object();
+
+ public void sentBytes(int x) {
+ synchronized(bytesSync) {
+ sentBytes += x;
+ }
+ }
+
+ public void receivedBytes(int x) {
+ synchronized(bytesSync) {
+ receivedBytes += x;
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2006-07-10 23:27:27 UTC
(rev 9552)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2006-07-10 23:35:54 UTC
(rev 9553)
@@ -499,22 +499,6 @@
throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
status = code;
- if((status != TIMED_OUT) && (status != GENERATED_REJECTED_OVERLOAD) &&
(status != INTERNAL_ERROR)) {
- if(key instanceof NodeSSK) {
- Logger.minor(this, "SSK fetch cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
- (source == null ? node.localSskFetchBytesSentAverage :
node.remoteSskFetchBytesSentAverage)
- .report(getTotalSentBytes());
- (source == null ? node.localSskFetchBytesReceivedAverage :
node.remoteSskFetchBytesReceivedAverage)
- .report(getTotalReceivedBytes());
- } else {
- Logger.minor(this, "CHK fetch cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
- (source == null ? node.localChkFetchBytesSentAverage :
node.remoteChkFetchBytesSentAverage)
- .report(getTotalSentBytes());
- (source == null ? node.localChkFetchBytesReceivedAverage :
node.remoteChkFetchBytesReceivedAverage)
- .report(getTotalReceivedBytes());
- }
- }
-
synchronized(this) {
notifyAll();
}
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-10
23:27:27 UTC (rev 9552)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-07-10
23:35:54 UTC (rev 9553)
@@ -20,7 +20,7 @@
* Handles an incoming SSK insert.
* SSKs need their own insert/request classes, see comments in SSKInsertSender.
*/
-public class SSKInsertHandler implements Runnable {
+public class SSKInsertHandler implements Runnable, ByteCounter {
static final int PUBKEY_TIMEOUT = 10000;
@@ -85,7 +85,7 @@
Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);
try {
- source.send(accepted, null);
+ source.send(accepted, this);
} catch (NotConnectedException e1) {
Logger.minor(this, "Lost connection to source");
return;
@@ -98,7 +98,7 @@
MessageFilter mfPK =
MessageFilter.create().setType(DMT.FNPSSKPubKey).setField(DMT.UID,
uid).setSource(source).setTimeout(PUBKEY_TIMEOUT);
try {
- Message pk = node.usm.waitFor(mfPK, null);
+ Message pk = node.usm.waitFor(mfPK, this);
if(pk == null) {
Logger.normal(this, "Failed to receive
FNPSSKPubKey for "+uid);
return;
@@ -109,7 +109,7 @@
Logger.minor(this, "Got pubkey on
"+uid+" : "+pubKey);
Message confirm =
DMT.createFNPSSKPubKeyAccepted(uid);
try {
- source.sendAsync(confirm, null,
0, null);
+ source.sendAsync(confirm, null,
0, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost
connection to source on "+uid);
return;
@@ -118,7 +118,7 @@
Logger.error(this, "Invalid pubkey from
"+source+" on "+uid);
Message msg =
DMT.createFNPDataInsertRejected(uid, DMT.DATA_INSERT_REJECTED_SSK_ERROR);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException ee) {
// Ignore
}
@@ -137,7 +137,7 @@
Logger.error(this, "Invalid SSK from "+source, e1);
Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_SSK_ERROR);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
// Ignore
}
@@ -149,7 +149,7 @@
if((storedBlock != null) && !storedBlock.equals(block)) {
Message msg = DMT.createFNPSSKDataFound(uid,
storedBlock.getRawHeaders(), storedBlock.getRawData());
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to source
on "+uid);
}
@@ -162,12 +162,12 @@
Message msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
// Ignore
}
canCommit = true;
- finish();
+ finish(SSKInsertSender.SUCCESS);
return;
}
@@ -191,7 +191,7 @@
// Forward it
Message m = DMT.createFNPRejectedOverload(uid, false);
try {
- source.send(m, null);
+ source.send(m, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -210,7 +210,7 @@
}
Message msg = DMT.createFNPSSKDataFound(uid, headers, data);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to source");
return;
@@ -231,7 +231,7 @@
(status == SSKInsertSender.INTERNAL_ERROR)) {
Message msg = DMT.createFNPRejectedOverload(uid, true);
try {
- source.send(msg, null);
+ source.send(msg, this);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to
source");
return;
@@ -240,7 +240,7 @@
if((status == SSKInsertSender.TIMED_OUT) ||
(status ==
SSKInsertSender.GENERATED_REJECTED_OVERLOAD))
canCommit = true;
- finish();
+ finish(status);
return;
}
@@ -253,7 +253,7 @@
return;
}
canCommit = true;
- finish();
+ finish(status);
return;
}
@@ -267,7 +267,7 @@
return;
}
canCommit = true;
- finish();
+ finish(status);
return;
}
@@ -279,7 +279,7 @@
} catch (NotConnectedException e) {
// Ignore
}
- finish();
+ finish(status);
return;
}
}
@@ -288,7 +288,7 @@
* If canCommit, and we have received all the data, and it
* verifies, then commit it.
*/
- private void finish() {
+ private void finish(int code) {
Logger.minor(this, "Finishing");
if(canCommit) {
@@ -298,6 +298,44 @@
Logger.normal(this, "Collision on "+this);
}
}
+
+ if(code != SSKInsertSender.TIMED_OUT && code !=
SSKInsertSender.GENERATED_REJECTED_OVERLOAD &&
+ code != SSKInsertSender.INTERNAL_ERROR && code !=
SSKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+ int totalSent = getTotalSentBytes();
+ int totalReceived = getTotalReceivedBytes();
+ if(sender != null) {
+ totalSent += sender.getTotalSentBytes();
+ totalReceived += sender.getTotalReceivedBytes();
+ }
+ Logger.minor(this, "Remote SSK insert cost
"+totalSent+"/"+totalReceived+" bytes ("+code+")");
+ node.remoteSskInsertBytesSentAverage.report(totalSent);
+ node.remoteSskInsertBytesReceivedAverage.report(totalReceived);
+ }
+
}
+
+ private final Object totalBytesSync = new Object();
+ private int totalBytesSent;
+ private int totalBytesReceived;
+ public void sentBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesSent += x;
+ }
+ }
+
+ public void receivedBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesReceived += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ return totalBytesSent;
+ }
+
+ public int getTotalReceivedBytes() {
+ return totalBytesReceived;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-10 23:27:27 UTC
(rev 9552)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-10 23:35:54 UTC
(rev 9553)
@@ -451,15 +451,6 @@
notifyAll();
}
- if((code != TIMED_OUT) && (code != GENERATED_REJECTED_OVERLOAD) &&
(code != INTERNAL_ERROR)
- && (code != ROUTE_REALLY_NOT_FOUND)) {
- Logger.minor(this, "SSK insert cost
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
- (source == null ? node.localChkInsertBytesSentAverage :
node.remoteChkInsertBytesSentAverage)
- .report(getTotalSentBytes());
- (source == null ? node.localChkInsertBytesReceivedAverage :
node.remoteChkInsertBytesReceivedAverage)
- .report(getTotalReceivedBytes());
- }
-
Logger.minor(this, "Set status code: "+getStatusString());
// Nothing to wait for, no downstream transfers, just exit.
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-10 23:27:27 UTC (rev
9552)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-10 23:35:54 UTC (rev
9553)
@@ -18,12 +18,12 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 870;
+ private static final int buildNumber = 871;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 844;
- private static final int newLastGoodBuild = 868;
- private static final long transitionTime = 1152410400000L; // 2:00 GMT
9/07/06
+ private static final int newLastGoodBuild = 871;
+ private static final long transitionTime = 1153094400000L; // 0:00 GMT
17/07/06
public static final int buildNumber() {
return buildNumber;