Author: toad
Date: 2009-01-12 23:11:51 +0000 (Mon, 12 Jan 2009)
New Revision: 25041
Modified:
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/node/CHKInsertHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
Log:
Treat a transfer taking more than 15 seconds as a transfer failure for purposes
of backoff.
More logging/statistics.
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2009-01-12 20:04:10 UTC
(rev 25040)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2009-01-12 23:11:51 UTC
(rev 25041)
@@ -59,4 +59,7 @@
/** Compact toString() */
String shortToString();
+
+ /** Report a transfer failure */
+ void transferFailed(String reason);
}
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2009-01-12
20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2009-01-12
23:11:51 UTC (rev 25041)
@@ -31,6 +31,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.RetrievalException;
+import freenet.node.Ticker;
import freenet.support.BitArray;
import freenet.support.Buffer;
import freenet.support.Logger;
@@ -53,6 +54,8 @@
public static final int MAX_CONSECUTIVE_MISSING_PACKET_REPORTS = 4;
public static final int MAX_SEND_INTERVAL = 500;
public static final int CLEANUP_TIMEOUT = 5000;
+ // After 15 seconds, the receive is overdue and will cause backoff.
+ public static final int TOO_LONG_TIMEOUT = 15000;
PartiallyReceivedBlock _prb;
PeerContext _sender;
long _uid;
@@ -60,18 +63,21 @@
/** packet : Integer -> reportTime : Long * */
HashMap<Integer, Long> _recentlyReportedMissingPackets = new
HashMap<Integer, Long>();
ByteCounter _ctr;
+ Ticker _ticker;
boolean sentAborted;
private MessageFilter discardFilter;
private long discardEndTime;
+ private boolean tookTooLong;
boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
- public BlockReceiver(MessageCore usm, PeerContext sender, long uid,
PartiallyReceivedBlock prb, ByteCounter ctr) {
+ public BlockReceiver(MessageCore usm, PeerContext sender, long uid,
PartiallyReceivedBlock prb, ByteCounter ctr, Ticker ticker) {
_sender = sender;
_prb = prb;
_uid = uid;
_usm = usm;
_ctr = ctr;
+ _ticker = ticker;
}
public void sendAborted(int reason, String desc) throws
NotConnectedException {
@@ -81,6 +87,22 @@
public byte[] receive() throws RetrievalException {
long startTime = System.currentTimeMillis();
+ _ticker.queueTimedJob(new Runnable() {
+
+ public void run() {
+ if(!_sender.isConnected()) return;
+ try {
+ if(_prb.allReceived()) return;
+ } catch (AbortedException e) {
+ return;
+ }
+ synchronized(BlockReceiver.this) {
+ tookTooLong = true;
+ }
+ _sender.transferFailed("Took too long (still
running)");
+ }
+
+ }, TOO_LONG_TIMEOUT);
int consecutiveMissingPacketReports = 0;
try {
MessageFilter mfPacketTransmit =
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
_uid).setSource(_sender);
@@ -233,5 +255,9 @@
public void onRestarted(PeerContext ctx) {
// Ignore
}
+
+ public synchronized boolean tookTooLong() {
+ return tookTooLong;
+ }
}
Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java 2009-01-12
20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2009-01-12
23:11:51 UTC (rev 25041)
@@ -121,7 +121,7 @@
Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
source.sendAsync(m, null, this);
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- br = new BlockReceiver(node.usm, source, uid, prb,
this);
+ br = new BlockReceiver(node.usm, source, uid, prb,
this, node.getTicker());
prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
return;
@@ -143,7 +143,7 @@
prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
if(htl > 0)
sender = node.makeInsertSender(key, htl, uid, source, headers,
prb, false, true);
- br = new BlockReceiver(node.usm, source, uid, prb, this);
+ br = new BlockReceiver(node.usm, source, uid, prb, this,
node.getTicker());
// Receive the data, off thread
Runnable dataReceiver = new DataReceiver();
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2009-01-12 20:04:10 UTC
(rev 25040)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2009-01-12 23:11:51 UTC
(rev 25041)
@@ -266,12 +266,13 @@
}
fireCHKTransferBegins();
- BlockReceiver br = new BlockReceiver(node.usm,
pn, uid, prb, this);
+ BlockReceiver br = new BlockReceiver(node.usm,
pn, uid, prb, this, node.getTicker());
try {
if(logMINOR) Logger.minor(this,
"Receiving data");
byte[] data = br.receive();
- pn.transferSuccess();
+ if(!br.tookTooLong())
+ pn.transferSuccess();
if(logMINOR) Logger.minor(this,
"Received data");
// Received data
try {
@@ -292,7 +293,8 @@
// A certain
number of these are normal, it's better to track them through statistics than
call attention to them in the logs.
Logger.normal(this, "Transfer for offer failed
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e+"
from "+pn, e);
finish(GET_OFFER_TRANSFER_FAILED, pn,
true);
-
pn.transferFailed("RequestSenderGetOfferedTransferFailed");
+ if(!br.tookTooLong())
+
pn.transferFailed("RequestSenderGetOfferedTransferFailed");
offers.deleteLastOffer();
node.nodeStats.failedBlockReceive();
return;
@@ -768,12 +770,16 @@
}
fireCHKTransferBegins();
- BlockReceiver br = new BlockReceiver(node.usm,
next, uid, prb, this);
+ long tStart = System.currentTimeMillis();
+ BlockReceiver br = new BlockReceiver(node.usm,
next, uid, prb, this, node.getTicker());
try {
if(logMINOR) Logger.minor(this,
"Receiving data");
byte[] data = br.receive();
- next.transferSuccess();
+ long tEnd = System.currentTimeMillis();
+ this.transferTime = tEnd - tStart;
+ if(!br.tookTooLong())
+ next.transferSuccess();
next.successNotOverload();
if(logMINOR) Logger.minor(this,
"Received data");
// Received data
@@ -796,7 +802,8 @@
next.localRejectedOverload("TransferFailedRequest"+e.getReason());
finish(TRANSFER_FAILED, next, false);
node.failureTable.onFinalFailure(key,
next, htl, FailureTable.REJECT_TIME, source);
-
next.transferFailed("RequestSenderTransferFailed");
+ if(!br.tookTooLong())
+
next.transferFailed("RequestSenderTransferFailed");
return;
}
} finally {
@@ -1059,6 +1066,10 @@
private static MedianMeanRunningAverage avgTimeTaken = new
MedianMeanRunningAverage();
+ private static MedianMeanRunningAverage avgTimeTakenTransfer = new
MedianMeanRunningAverage();
+
+ private long transferTime;
+
private void finish(int code, PeerNode next, boolean fromOfferedKey) {
if(logMINOR) Logger.minor(this, "finish("+code+ ')');
@@ -1070,11 +1081,14 @@
}
if(status == SUCCESS) {
- if(key instanceof NodeCHK) {
+ if(key instanceof NodeCHK && transferTime > 0) {
long timeTaken = System.currentTimeMillis() - startTime;
synchronized(avgTimeTaken) {
avgTimeTaken.report(timeTaken);
+ avgTimeTakenTransfer.report(transferTime);
if(logMINOR) Logger.minor(this, "Successful CHK
request took "+timeTaken+" average "+avgTimeTaken);
+ if(logMINOR) Logger.minor(this, "Successful CHK
request transfer "+transferTime+" average "+avgTimeTakenTransfer);
+ if(logMINOR) Logger.minor(this, "Search phase:
median "+(avgTimeTaken.currentValue() -
avgTimeTakenTransfer.currentValue())+"ms, mean "+(avgTimeTaken.meanValue() -
avgTimeTakenTransfer.meanValue())+"ms");
}
}
if(next != null) {
Modified: trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
===================================================================
--- trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
2009-01-12 20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
2009-01-12 23:11:51 UTC (rev 25041)
@@ -50,5 +50,9 @@
public synchronized String toString() {
return "Median "+currentValue()+" mean "+mean.currentValue();
}
+
+ public synchronized double meanValue() {
+ return mean.currentValue();
+ }
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs