Author: toad
Date: 2009-01-12 23:11:51 +0000 (Mon, 12 Jan 2009)
New Revision: 25041

Modified:
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
Log:
Treat a transfer taking more than 15 seconds as a transfer failure for purposes 
of backoff.
More logging/statistics.


Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2009-01-12 20:04:10 UTC 
(rev 25040)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2009-01-12 23:11:51 UTC 
(rev 25041)
@@ -59,4 +59,7 @@
 
        /** Compact toString() */
        String shortToString();
+       
+       /** Report a transfer failure */
+       void transferFailed(String reason);
 }

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2009-01-12 
20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2009-01-12 
23:11:51 UTC (rev 25041)
@@ -31,6 +31,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.RetrievalException;
+import freenet.node.Ticker;
 import freenet.support.BitArray;
 import freenet.support.Buffer;
 import freenet.support.Logger;
@@ -53,6 +54,8 @@
        public static final int MAX_CONSECUTIVE_MISSING_PACKET_REPORTS = 4;
        public static final int MAX_SEND_INTERVAL = 500;
        public static final int CLEANUP_TIMEOUT = 5000;
+       // After 15 seconds, the receive is overdue and will cause backoff.
+       public static final int TOO_LONG_TIMEOUT = 15000;
        PartiallyReceivedBlock _prb;
        PeerContext _sender;
        long _uid;
@@ -60,18 +63,21 @@
        /** packet : Integer -> reportTime : Long * */
        HashMap<Integer, Long> _recentlyReportedMissingPackets = new 
HashMap<Integer, Long>();
        ByteCounter _ctr;
+       Ticker _ticker;
        boolean sentAborted;
        private MessageFilter discardFilter;
        private long discardEndTime;
+       private boolean tookTooLong;
 
        boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
        
-       public BlockReceiver(MessageCore usm, PeerContext sender, long uid, 
PartiallyReceivedBlock prb, ByteCounter ctr) {
+       public BlockReceiver(MessageCore usm, PeerContext sender, long uid, 
PartiallyReceivedBlock prb, ByteCounter ctr, Ticker ticker) {
                _sender = sender;
                _prb = prb;
                _uid = uid;
                _usm = usm;
                _ctr = ctr;
+               _ticker = ticker;
        }
 
        public void sendAborted(int reason, String desc) throws 
NotConnectedException {
@@ -81,6 +87,22 @@
        
        public byte[] receive() throws RetrievalException {
                long startTime = System.currentTimeMillis();
+               _ticker.queueTimedJob(new Runnable() {
+
+                       public void run() {
+                               if(!_sender.isConnected()) return;
+                               try {
+                                       if(_prb.allReceived()) return;
+                               } catch (AbortedException e) {
+                                       return;
+                               }
+                               synchronized(BlockReceiver.this) {
+                                       tookTooLong = true;
+                               }
+                               _sender.transferFailed("Took too long (still 
running)");
+                       }
+                       
+               }, TOO_LONG_TIMEOUT);
                int consecutiveMissingPacketReports = 0;
                try {
                        MessageFilter mfPacketTransmit = 
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
 _uid).setSource(_sender);
@@ -233,5 +255,9 @@
        public void onRestarted(PeerContext ctx) {
                // Ignore
        }
+
+       public synchronized boolean tookTooLong() {
+               return tookTooLong;
+       }
        
 }

Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java        2009-01-12 
20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java        2009-01-12 
23:11:51 UTC (rev 25041)
@@ -121,7 +121,7 @@
                        Message m = DMT.createFNPInsertTransfersCompleted(uid, 
true);
                        source.sendAsync(m, null, this);
                        prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
-                       br = new BlockReceiver(node.usm, source, uid, prb, 
this);
+                       br = new BlockReceiver(node.usm, source, uid, prb, 
this, node.getTicker());
                        prb.abort(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        br.sendAborted(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        return;
@@ -143,7 +143,7 @@
         prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
         if(htl > 0)
             sender = node.makeInsertSender(key, htl, uid, source, headers, 
prb, false, true);
-        br = new BlockReceiver(node.usm, source, uid, prb, this);
+        br = new BlockReceiver(node.usm, source, uid, prb, this, 
node.getTicker());
         
         // Receive the data, off thread
         Runnable dataReceiver = new DataReceiver();

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2009-01-12 20:04:10 UTC 
(rev 25040)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2009-01-12 23:11:51 UTC 
(rev 25041)
@@ -266,12 +266,13 @@
                                }
                                fireCHKTransferBegins();
                                                
-                               BlockReceiver br = new BlockReceiver(node.usm, 
pn, uid, prb, this);
+                               BlockReceiver br = new BlockReceiver(node.usm, 
pn, uid, prb, this, node.getTicker());
                                
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Receiving data");
                                        byte[] data = br.receive();
-                                       pn.transferSuccess();
+                                       if(!br.tookTooLong())
+                                               pn.transferSuccess();
                                        if(logMINOR) Logger.minor(this, 
"Received data");
                                        // Received data
                                        try {
@@ -292,7 +293,8 @@
                                                                // A certain 
number of these are normal, it's better to track them through statistics than 
call attention to them in the logs.
                                                                
Logger.normal(this, "Transfer for offer failed 
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e+" 
from "+pn, e);
                                        finish(GET_OFFER_TRANSFER_FAILED, pn, 
true);
-                                       
pn.transferFailed("RequestSenderGetOfferedTransferFailed");
+                                       if(!br.tookTooLong())
+                                               
pn.transferFailed("RequestSenderGetOfferedTransferFailed");
                                offers.deleteLastOffer();
                                        node.nodeStats.failedBlockReceive();
                                        return;
@@ -768,12 +770,16 @@
                                }
                                fireCHKTransferBegins();
                                                
-                               BlockReceiver br = new BlockReceiver(node.usm, 
next, uid, prb, this);
+                               long tStart = System.currentTimeMillis();
+                               BlockReceiver br = new BlockReceiver(node.usm, 
next, uid, prb, this, node.getTicker());
                                
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Receiving data");
                                        byte[] data = br.receive();
-                                       next.transferSuccess();
+                                       long tEnd = System.currentTimeMillis();
+                                       this.transferTime = tEnd - tStart;
+                                       if(!br.tookTooLong())
+                                               next.transferSuccess();
                                next.successNotOverload();
                                        if(logMINOR) Logger.minor(this, 
"Received data");
                                        // Received data
@@ -796,7 +802,8 @@
                                                        
next.localRejectedOverload("TransferFailedRequest"+e.getReason());
                                        finish(TRANSFER_FAILED, next, false);
                                        node.failureTable.onFinalFailure(key, 
next, htl, FailureTable.REJECT_TIME, source);
-                                       
next.transferFailed("RequestSenderTransferFailed");
+                                       if(!br.tookTooLong())
+                                               
next.transferFailed("RequestSenderTransferFailed");
                                        return;
                                }
                        } finally {
@@ -1059,6 +1066,10 @@
     
        private static MedianMeanRunningAverage avgTimeTaken = new 
MedianMeanRunningAverage();
        
+       private static MedianMeanRunningAverage avgTimeTakenTransfer = new 
MedianMeanRunningAverage();
+       
+       private long transferTime;
+       
     private void finish(int code, PeerNode next, boolean fromOfferedKey) {
        if(logMINOR) Logger.minor(this, "finish("+code+ ')');
         
@@ -1070,11 +1081,14 @@
         }
                
         if(status == SUCCESS) {
-               if(key instanceof NodeCHK) {
+               if(key instanceof NodeCHK && transferTime > 0) {
                        long timeTaken = System.currentTimeMillis() - startTime;
                        synchronized(avgTimeTaken) {
                                avgTimeTaken.report(timeTaken);
+                               avgTimeTakenTransfer.report(transferTime);
                                if(logMINOR) Logger.minor(this, "Successful CHK 
request took "+timeTaken+" average "+avgTimeTaken);
+                               if(logMINOR) Logger.minor(this, "Successful CHK 
request transfer "+transferTime+" average "+avgTimeTakenTransfer);
+                               if(logMINOR) Logger.minor(this, "Search phase: 
median "+(avgTimeTaken.currentValue() - 
avgTimeTakenTransfer.currentValue())+"ms, mean "+(avgTimeTaken.meanValue() - 
avgTimeTakenTransfer.meanValue())+"ms");
                        }
                }
                if(next != null) {

Modified: trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java
===================================================================
--- trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java        
2009-01-12 20:04:10 UTC (rev 25040)
+++ trunk/freenet/src/freenet/support/math/MedianMeanRunningAverage.java        
2009-01-12 23:11:51 UTC (rev 25041)
@@ -50,5 +50,9 @@
        public synchronized String toString() {
                return "Median "+currentValue()+" mean "+mean.currentValue();
        }
+       
+       public synchronized double meanValue() {
+               return mean.currentValue();
+       }
 
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to