Author: toad
Date: 2006-07-10 23:35:54 +0000 (Mon, 10 Jul 2006)
New Revision: 9553

Modified:
   trunk/freenet/src/freenet/node/CHKInsertSender.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
   trunk/freenet/src/freenet/node/Version.java
Log:
871: Take into account bandwidth used by receiving requests/inserts as well as 
sending them when determining the bandwidth cost of a request.
Becomes mandatory at 0:00 GMT on Monday the 18th.

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-10 23:27:27 UTC 
(rev 9552)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-10 23:35:54 UTC 
(rev 9553)
@@ -188,6 +188,8 @@
     static final int GENERATED_REJECTED_OVERLOAD = 5;
     /** Could not get off the node at all! */
     static final int ROUTE_REALLY_NOT_FOUND = 6;
+    /** Receive failed. Not used internally; only used by CHKInsertHandler. */
+    static final int RECEIVE_FAILED = 7;

     public String toString() {
         return super.toString()+" for "+uid;
@@ -554,15 +556,6 @@
                notifyAll();
         }

-        if((code != TIMED_OUT) && (code != GENERATED_REJECTED_OVERLOAD) && 
(code != INTERNAL_ERROR)
-                       && (code != ROUTE_REALLY_NOT_FOUND)) {
-               Logger.minor(this, "CHK insert cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
-               (source == null ? node.localChkInsertBytesSentAverage : 
node.remoteChkInsertBytesSentAverage)
-                               .report(getTotalSentBytes());
-               (source == null ? node.localChkInsertBytesReceivedAverage : 
node.remoteChkInsertBytesReceivedAverage)
-                               .report(getTotalReceivedBytes());
-        }
-        
         Logger.minor(this, "Returning from finish()");
     }


Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-10 23:27:27 UTC 
(rev 9552)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-10 23:35:54 UTC 
(rev 9553)
@@ -22,7 +22,7 @@
  * Handle an incoming insert request.
  * This corresponds to RequestHandler.
  */
-public class InsertHandler implements Runnable {
+public class InsertHandler implements Runnable, ByteCounter {


     static final int DATA_INSERT_TIMEOUT = 10000;
@@ -80,7 +80,7 @@
         // Send Accepted
         Message accepted = DMT.createFNPAccepted(uid);
         try {
-                       source.send(accepted, null);
+                       source.send(accepted, this);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -93,7 +93,7 @@

         Message msg;
         try {
-            msg = node.usm.waitFor(mf, null);
+            msg = node.usm.waitFor(mf, this);
         } catch (DisconnectedException e) {
             Logger.normal(this, "Disconnected while waiting for DataInsert on 
"+uid);
             return;
@@ -106,11 +106,11 @@
                        if(source.isConnected() && (startTime > 
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4)))
                                Logger.error(this, "Did not receive DataInsert 
on "+uid+" from "+source+" !");
                        Message tooSlow = DMT.createFNPRejectedTimeout(uid);
-                       source.sendAsync(tooSlow, null, 0, null);
+                       source.sendAsync(tooSlow, null, 0, this);
                        Message m = DMT.createFNPInsertTransfersCompleted(uid, 
true);
-                       source.sendAsync(m, null, 0, null);
+                       source.sendAsync(m, null, 0, this);
                        prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
-                       br = new BlockReceiver(node.usm, source, uid, prb, 
null);
+                       br = new BlockReceiver(node.usm, source, uid, prb, 
this);
                        prb.abort(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        br.sendAborted(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        return;
@@ -132,7 +132,7 @@
         prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
         if(htl > 0)
             sender = node.makeInsertSender(key, htl, uid, source, headers, 
prb, false, closestLoc, true);
-        br = new BlockReceiver(node.usm, source, uid, prb, null);
+        br = new BlockReceiver(node.usm, source, uid, prb, this);

         // Receive the data, off thread

@@ -146,11 +146,11 @@
                msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                               source.send(msg, null);
+                               source.send(msg, this);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
-            finish();
+            finish(CHKInsertSender.SUCCESS);
             return;
         }

@@ -186,7 +186,7 @@
                 // Cancel the sender
                 sender.receiveFailed(); // tell it to stop if it hasn't 
already failed... unless it's sending from store
                 // Nothing else we can do
-                finish();
+                finish(CHKInsertSender.RECEIVE_FAILED);
                 return;
             }

@@ -195,7 +195,7 @@
                // Forward it
                Message m = DMT.createFNPRejectedOverload(uid, false);
                try {
-                                       source.send(m, null);
+                                       source.send(m, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -225,7 +225,7 @@
                        (status == CHKInsertSender.INTERNAL_ERROR)) {
                 msg = DMT.createFNPRejectedOverload(uid, true);
                 try {
-                                       source.send(msg, null);
+                                       source.send(msg, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -234,20 +234,20 @@
                 if((status == CHKInsertSender.TIMED_OUT) ||
                                (status == 
CHKInsertSender.GENERATED_REJECTED_OVERLOAD))
                        canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

             if((status == CHKInsertSender.ROUTE_NOT_FOUND) || (status == 
CHKInsertSender.ROUTE_REALLY_NOT_FOUND)) {
                 msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
                 try {
-                                       source.send(msg, null);
+                                       source.send(msg, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
                                }
                 canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

@@ -255,13 +255,13 @@
                msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                                       source.send(msg, null);
+                                       source.send(msg, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
                                }
                 canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

@@ -269,11 +269,11 @@
             Logger.error(this, "Unknown status code: 
"+sender.getStatusString());
             msg = DMT.createFNPRejectedOverload(uid, true);
             try {
-                               source.send(msg, null);
+                               source.send(msg, this);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
-            finish();
+            finish(CHKInsertSender.INTERNAL_ERROR);
             return;
         }
        }
@@ -286,7 +286,7 @@
      * If canCommit, and we have received all the data, and it
      * verifies, then commit it.
      */
-    private void finish() {
+    private void finish(int code) {
        Logger.minor(this, "Finishing");
         maybeCommit();

@@ -314,13 +314,27 @@
                boolean failed = sender.anyTransfersFailed();
                Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
                try {
-                       source.sendAsync(m, null, 0, null);
+                       source.send(m, this);
                        Logger.minor(this, "Sent completion: "+failed+" for 
"+this);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Not connected: "+source+" for 
"+this);
                        // May need to commit anyway...
                }
         }
+        
+        if(code != CHKInsertSender.TIMED_OUT && code != 
CHKInsertSender.GENERATED_REJECTED_OVERLOAD && 
+                       code != CHKInsertSender.INTERNAL_ERROR && code != 
CHKInsertSender.ROUTE_REALLY_NOT_FOUND &&
+                       code != CHKInsertSender.RECEIVE_FAILED) {
+               int totalSent = getTotalSentBytes();
+               int totalReceived = getTotalReceivedBytes();
+               if(sender != null) {
+                       totalSent += sender.getTotalSentBytes();
+                       totalReceived += sender.getTotalReceivedBytes();
+               }
+               Logger.minor(this, "Remote CHK insert cost 
"+totalSent+"/"+totalReceived+" bytes ("+code+")");
+               node.remoteChkInsertBytesSentAverage.report(totalSent);
+               node.remoteChkInsertBytesReceivedAverage.report(totalReceived);
+        }
     }

     private void maybeCommit() {
@@ -344,7 +358,7 @@
         }
         if(toSend != null) {
             try {
-                source.sendAsync(toSend, null, 0, null);
+                source.sendAsync(toSend, null, 0, this);
             } catch (NotConnectedException e) {
                 // :(
                 Logger.minor(this, "Lost connection in "+this+" when sending 
FNPDataInsertRejected");
@@ -368,7 +382,7 @@
                 runThread.interrupt();
                 Message msg = DMT.createFNPDataInsertRejected(uid, 
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
                 try {
-                    source.send(msg, null);
+                    source.send(msg, InsertHandler.this);
                 } catch (NotConnectedException ex) {
                     Logger.error(this, "Can't send "+msg+" to "+source+": 
"+ex);
                 }
@@ -385,4 +399,27 @@

     }

+    private final Object totalSync = new Object();
+    private int totalSentBytes;
+    private int totalReceivedBytes;
+    
+       public void sentBytes(int x) {
+               synchronized(totalSync) {
+                       totalSentBytes += x;
+               }
+       }
+
+       public void receivedBytes(int x) {
+               synchronized(totalSync) {
+                       totalReceivedBytes += x;
+               }
+       }
+
+       public int getTotalSentBytes() {
+               return totalSentBytes;
+       }
+       
+       public int getTotalReceivedBytes() {
+               return totalReceivedBytes;
+       }
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-07-10 23:27:27 UTC (rev 
9552)
+++ trunk/freenet/src/freenet/node/Node.java    2006-07-10 23:35:54 UTC (rev 
9553)
@@ -1585,23 +1585,24 @@

                // Select the request scheduler

-               localChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
-               localSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               // Guesstimates. Hopefully well over the reality.
+               localChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
+               localSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
                localChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
                localSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
                localChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
                localSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
-               localChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
-               localSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               localChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(1024, 180000, 0.0, Long.MAX_VALUE);
+               localSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);

-               remoteChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
-               remoteSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
-               remoteChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
-               remoteSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
-               remoteChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
-               remoteSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
-               remoteChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
-               remoteSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);

                // FIXME make all the below arbitrary constants configurable!

@@ -1882,6 +1883,12 @@
                        if(status == RequestSender.NOT_FINISHED) 
                                continue;

+               if(status != RequestSender.TIMED_OUT && status != 
RequestSender.GENERATED_REJECTED_OVERLOAD && status != 
RequestSender.INTERNAL_ERROR) {
+               Logger.minor(this, "CHK fetch cost 
"+rs.getTotalSentBytes()+"/"+rs.getTotalReceivedBytes()+" bytes ("+status+")");
+               localChkFetchBytesSentAverage.report(rs.getTotalSentBytes());
+               
localChkFetchBytesReceivedAverage.report(rs.getTotalReceivedBytes());
+               }
+                       
                        if((status == RequestSender.TIMED_OUT) ||
                                        (status == 
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
                                if(!rejectedOverload) {
@@ -1970,6 +1977,12 @@

                        if(status == RequestSender.NOT_FINISHED) 
                                continue;
+
+               if(status != RequestSender.TIMED_OUT && status != 
RequestSender.GENERATED_REJECTED_OVERLOAD && status != 
RequestSender.INTERNAL_ERROR) {
+               Logger.minor(this, "SSK fetch cost 
"+rs.getTotalSentBytes()+"/"+rs.getTotalReceivedBytes()+" bytes ("+status+")");
+               localSskFetchBytesSentAverage.report(rs.getTotalSentBytes());
+               
localSskFetchBytesReceivedAverage.report(rs.getTotalReceivedBytes());
+               }

                        if((status == RequestSender.TIMED_OUT) ||
                                        (status == 
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
@@ -2106,11 +2119,20 @@
                        }
                }

-               if(is.getStatus() == CHKInsertSender.SUCCESS) {
+               int status = is.getStatus();
+        if(status != CHKInsertSender.TIMED_OUT && status != 
CHKInsertSender.GENERATED_REJECTED_OVERLOAD && status != 
CHKInsertSender.INTERNAL_ERROR
+                       && status != CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+               int sent = is.getTotalSentBytes();
+               int received = is.getTotalReceivedBytes();
+               Logger.minor(this, "Local CHK insert cost "+sent+"/"+received+" 
bytes ("+status+")");
+               localChkInsertBytesSentAverage.report(sent);
+               localChkInsertBytesReceivedAverage.report(received);
+        }
+        
+               if(status == CHKInsertSender.SUCCESS) {
                        Logger.normal(this, "Succeeded inserting "+block);
                        return;
                } else {
-                       int status = is.getStatus();
                        String msg = "Failed inserting "+block+" : 
"+is.getStatusString();
                        if(status == CHKInsertSender.ROUTE_NOT_FOUND)
                                msg += " - this is normal on small networks; 
the data will still be propagated, but it can't find the 20+ nodes needed for 
full success";
@@ -2203,6 +2225,17 @@
                        }
                }

+               int status = is.getStatus();
+               
+        if(status != CHKInsertSender.TIMED_OUT && status != 
CHKInsertSender.GENERATED_REJECTED_OVERLOAD && status != 
CHKInsertSender.INTERNAL_ERROR
+                       && status != CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+               int sent = is.getTotalSentBytes();
+               int received = is.getTotalReceivedBytes();
+               Logger.minor(this, "Local SSK insert cost "+sent+"/"+received+" 
bytes ("+status+")");
+               localSskInsertBytesSentAverage.report(sent);
+               localSskInsertBytesReceivedAverage.report(received);
+        }
+        
                if(is.hasCollided()) {
                        // Store it locally so it can be fetched immediately, 
and overwrites any locally inserted.
                        try {
@@ -2215,11 +2248,10 @@
                        throw new 
LowLevelPutException(LowLevelPutException.COLLISION);
                }

-               if(is.getStatus() == SSKInsertSender.SUCCESS) {
+               if(status == SSKInsertSender.SUCCESS) {
                        Logger.normal(this, "Succeeded inserting "+block);
                        return;
                } else {
-                       int status = is.getStatus();
                        String msg = "Failed inserting "+block+" : 
"+is.getStatusString();
                        if(status == CHKInsertSender.ROUTE_NOT_FOUND)
                                msg += " - this is normal on small networks; 
the data will still be propagated, but it can't find the 20+ nodes needed for 
full success";
@@ -2266,13 +2298,16 @@
                double expected = 
                        (isInsert ? (isSSK ? 
this.remoteSskInsertBytesSentAverage : this.remoteChkInsertBytesSentAverage)
                                        : (isSSK ? 
this.remoteSskFetchBytesSentAverage : 
this.remoteChkFetchBytesSentAverage)).currentValue();
-               int e = (int)Math.max(expected, 0);
-               if(!requestOutputThrottle.instantGrab(e)) return "Insufficient 
output bandwidth";
+               int expectedSent = (int)Math.max(expected, 0);
+               if(!requestOutputThrottle.instantGrab(expectedSent)) return 
"Insufficient output bandwidth";
                expected = 
                        (isInsert ? (isSSK ? 
this.remoteSskInsertBytesReceivedAverage : 
this.remoteChkInsertBytesReceivedAverage)
                                        : (isSSK ? 
this.remoteSskFetchBytesReceivedAverage : 
this.remoteChkFetchBytesReceivedAverage)).currentValue();
-               e = (int)Math.max(expected, 0);
-               if(!requestInputThrottle.instantGrab(e)) return "Insufficient 
input bandwidth";
+               int expectedReceived = (int)Math.max(expected, 0);
+               if(!requestInputThrottle.instantGrab(expectedReceived)) {
+                       requestOutputThrottle.recycle(expectedSent);
+                       return "Insufficient input bandwidth";
+               }


                // If no recent reports, no packets have been sent; correct the 
average downwards.

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-10 23:27:27 UTC 
(rev 9552)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-10 23:35:54 UTC 
(rev 9553)
@@ -18,7 +18,7 @@
  * is separated off into RequestSender so we get transfer coalescing
  * and both ends for free. 
  */
-public class RequestHandler implements Runnable {
+public class RequestHandler implements Runnable, ByteCounter {

     final Message req;
     final Node node;
@@ -28,6 +28,7 @@
     private double closestLoc;
     private boolean needsPubKey;
     final Key key;
+    private boolean finalTransferFailed = false;

     public String toString() {
         return super.toString()+" for "+uid;
@@ -51,6 +52,8 @@
     }

     public void run() {
+       int status = RequestSender.NOT_FINISHED;
+       RequestSender rs = null;
         try {
         Logger.minor(this, "Handling a request: "+uid);
         htl = source.decrementHTL(htl);
@@ -70,21 +73,24 @@
                        Logger.minor(this, "Sending PK: "+key+" 
"+key.writeAsField());
                        source.send(pk, null);
                 }
+                status = RequestSender.SUCCESS; // for byte logging
             }
             if(block instanceof CHKBlock) {
                PartiallyReceivedBlock prb =
                        new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
                BlockTransmitter bt =
                        new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, null);
-               bt.send();
+               if(bt.send())
+                       status = RequestSender.SUCCESS; // for byte logging
             }
             return;
         }
-        RequestSender rs = (RequestSender) o;
+        rs = (RequestSender) o;

         if(rs == null) { // ran out of htl?
             Message dnf = DMT.createFNPDataNotFound(uid);
             source.send(dnf, null);
+            status = RequestSender.DATA_NOT_FOUND; // for byte logging
             return;
         }

@@ -105,18 +111,20 @@
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
                    new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, null);
-               bt.send(); // either fails or succeeds; other side will see, we 
don't care
+               if(!bt.send());
+                       finalTransferFailed = true;
                    return;
             }

-            int status = rs.getStatus();
+            status = rs.getStatus();
+
+            if(status == RequestSender.NOT_FINISHED) continue;

             switch(status) {
                case RequestSender.NOT_FINISHED:
-                   continue;
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.sendAsync(dnf, null, 0, null);
+                       source.send(dnf, this);
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -124,19 +132,20 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0, null);
+                       source.send(reject, this);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.sendAsync(rnf, null, 0, null);
+                       source.send(rnf, this);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
                         Message df = DMT.createFNPSSKDataFound(uid, 
rs.getHeaders(), rs.getSSKData());
-                        source.send(df, null);
+                        source.send(df, this);
                         if(needsPubKey) {
-                               source.send(df, null);
+                               Message pk = DMT.createFNPSSKPubKey(uid, 
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
+                               source.send(pk, this);
                         }
                        } else if(!rs.transferStarted()) {
                                Logger.error(this, "Status is SUCCESS but we 
never started a transfer on "+uid);
@@ -150,7 +159,7 @@
                                continue; // should have started transfer
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0, null);
+                       source.send(reject, this);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
@@ -169,6 +178,21 @@
             Logger.error(this, "Caught "+t, t);
         } finally {
             node.unlockUID(uid);
+            if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
+                       && status != RequestSender.INTERNAL_ERROR) {
+               int sent = rs.getTotalSentBytes() + sentBytes;
+               int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+               if(key instanceof NodeSSK) {
+                       Logger.minor(this, "Remote SSK fetch cost 
"+sent+"/"+rcvd+" bytes ("+status+")");
+                       node.remoteSskFetchBytesSentAverage.report(sent);
+                       node.remoteSskFetchBytesReceivedAverage.report(rcvd);
+               } else {
+                       Logger.minor(this, "Remote CHK fetch cost 
"+sent+"/"+rcvd+" bytes ("+status+")");
+                       node.remoteChkFetchBytesSentAverage.report(sent);
+                       node.remoteChkFetchBytesReceivedAverage.report(rcvd);
+               }
+            }
+
         }
     }

@@ -181,4 +205,20 @@
                        throw new IllegalStateException("Unknown key block 
type: "+block.getClass());
        }

+       private int sentBytes;
+       private int receivedBytes;
+       private final Object bytesSync = new Object();
+       
+       public void sentBytes(int x) {
+               synchronized(bytesSync) {
+                       sentBytes += x;
+               }
+       }
+
+       public void receivedBytes(int x) {
+               synchronized(bytesSync) {
+                       receivedBytes += x;
+               }
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2006-07-10 23:27:27 UTC 
(rev 9552)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2006-07-10 23:35:54 UTC 
(rev 9553)
@@ -499,22 +499,6 @@
                throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
         status = code;

-        if((status != TIMED_OUT) && (status != GENERATED_REJECTED_OVERLOAD) && 
(status != INTERNAL_ERROR)) {
-               if(key instanceof NodeSSK) {
-               Logger.minor(this, "SSK fetch cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
-               (source == null ? node.localSskFetchBytesSentAverage : 
node.remoteSskFetchBytesSentAverage)
-                               .report(getTotalSentBytes());
-               (source == null ? node.localSskFetchBytesReceivedAverage : 
node.remoteSskFetchBytesReceivedAverage)
-                               .report(getTotalReceivedBytes());
-               } else {
-               Logger.minor(this, "CHK fetch cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
-               (source == null ? node.localChkFetchBytesSentAverage : 
node.remoteChkFetchBytesSentAverage)
-                       .report(getTotalSentBytes());
-               (source == null ? node.localChkFetchBytesReceivedAverage : 
node.remoteChkFetchBytesReceivedAverage)
-                               .report(getTotalReceivedBytes());
-               }
-        }
-
         synchronized(this) {
             notifyAll();
         }

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-10 
23:27:27 UTC (rev 9552)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-10 
23:35:54 UTC (rev 9553)
@@ -20,7 +20,7 @@
  * Handles an incoming SSK insert.
  * SSKs need their own insert/request classes, see comments in SSKInsertSender.
  */
-public class SSKInsertHandler implements Runnable {
+public class SSKInsertHandler implements Runnable, ByteCounter {

     static final int PUBKEY_TIMEOUT = 10000;

@@ -85,7 +85,7 @@
         Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);

         try {
-                       source.send(accepted, null);
+                       source.send(accepted, this);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -98,7 +98,7 @@
                        MessageFilter mfPK = 
MessageFilter.create().setType(DMT.FNPSSKPubKey).setField(DMT.UID, 
uid).setSource(source).setTimeout(PUBKEY_TIMEOUT);

                        try {
-                               Message pk = node.usm.waitFor(mfPK, null);
+                               Message pk = node.usm.waitFor(mfPK, this);
                                if(pk == null) {
                                        Logger.normal(this, "Failed to receive 
FNPSSKPubKey for "+uid);
                                        return;
@@ -109,7 +109,7 @@
                                        Logger.minor(this, "Got pubkey on 
"+uid+" : "+pubKey);
                                        Message confirm = 
DMT.createFNPSSKPubKeyAccepted(uid);
                                        try {
-                                               source.sendAsync(confirm, null, 
0, null);
+                                               source.sendAsync(confirm, null, 
0, this);
                                        } catch (NotConnectedException e) {
                                                Logger.minor(this, "Lost 
connection to source on "+uid);
                                                return;
@@ -118,7 +118,7 @@
                                        Logger.error(this, "Invalid pubkey from 
"+source+" on "+uid);
                                        Message msg = 
DMT.createFNPDataInsertRejected(uid, DMT.DATA_INSERT_REJECTED_SSK_ERROR);
                                        try {
-                                               source.send(msg, null);
+                                               source.send(msg, this);
                                        } catch (NotConnectedException ee) {
                                                // Ignore
                                        }
@@ -137,7 +137,7 @@
                        Logger.error(this, "Invalid SSK from "+source, e1);
                        Message msg = DMT.createFNPDataInsertRejected(uid, 
DMT.DATA_INSERT_REJECTED_SSK_ERROR);
                        try {
-                               source.send(msg, null);
+                               source.send(msg, this);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -149,7 +149,7 @@
                if((storedBlock != null) && !storedBlock.equals(block)) {
                        Message msg = DMT.createFNPSSKDataFound(uid, 
storedBlock.getRawHeaders(), storedBlock.getRawData());
                        try {
-                               source.send(msg, null);
+                               source.send(msg, this);
                        } catch (NotConnectedException e) {
                                Logger.minor(this, "Lost connection to source 
on "+uid);
                        }
@@ -162,12 +162,12 @@
                Message msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                               source.send(msg, null);
+                               source.send(msg, this);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
                        canCommit = true;
-            finish();
+            finish(SSKInsertSender.SUCCESS);
             return;
         }

@@ -191,7 +191,7 @@
                // Forward it
                Message m = DMT.createFNPRejectedOverload(uid, false);
                try {
-                                       source.send(m, null);
+                                       source.send(m, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -210,7 +210,7 @@
                                }
                Message msg = DMT.createFNPSSKDataFound(uid, headers, data);
                try {
-                       source.send(msg, null);
+                       source.send(msg, this);
                } catch (NotConnectedException e) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -231,7 +231,7 @@
                        (status == SSKInsertSender.INTERNAL_ERROR)) {
                 Message msg = DMT.createFNPRejectedOverload(uid, true);
                 try {
-                                       source.send(msg, null);
+                                       source.send(msg, this);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -240,7 +240,7 @@
                 if((status == SSKInsertSender.TIMED_OUT) ||
                                (status == 
SSKInsertSender.GENERATED_REJECTED_OVERLOAD))
                        canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

@@ -253,7 +253,7 @@
                                        return;
                                }
                 canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

@@ -267,7 +267,7 @@
                                        return;
                                }
                 canCommit = true;
-                finish();
+                finish(status);
                 return;
             }

@@ -279,7 +279,7 @@
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
-            finish();
+            finish(status);
             return;
         }
     }
@@ -288,7 +288,7 @@
      * If canCommit, and we have received all the data, and it
      * verifies, then commit it.
      */
-    private void finish() {
+    private void finish(int code) {
        Logger.minor(this, "Finishing");

        if(canCommit) {
@@ -298,6 +298,44 @@
                                Logger.normal(this, "Collision on "+this);
                        }
        }
+       
+        if(code != SSKInsertSender.TIMED_OUT && code != 
SSKInsertSender.GENERATED_REJECTED_OVERLOAD &&
+                       code != SSKInsertSender.INTERNAL_ERROR && code != 
SSKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+               int totalSent = getTotalSentBytes();
+               int totalReceived = getTotalReceivedBytes();
+               if(sender != null) {
+                       totalSent += sender.getTotalSentBytes();
+                       totalReceived += sender.getTotalReceivedBytes();
+               }
+               Logger.minor(this, "Remote SSK insert cost 
"+totalSent+"/"+totalReceived+" bytes ("+code+")");
+               node.remoteSskInsertBytesSentAverage.report(totalSent);
+               node.remoteSskInsertBytesReceivedAverage.report(totalReceived);
+        }
+
     }
+
+    private final Object totalBytesSync = new Object();
+    private int totalBytesSent;
+    private int totalBytesReceived;

+       public void sentBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesSent += x;
+               }
+       }
+
+       public void receivedBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesReceived += x;
+               }
+       }
+       
+       public int getTotalSentBytes() {
+               return totalBytesSent;
+       }
+       
+       public int getTotalReceivedBytes() {
+               return totalBytesReceived;
+       }
+    
 }

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-10 23:27:27 UTC 
(rev 9552)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-10 23:35:54 UTC 
(rev 9553)
@@ -451,15 +451,6 @@
             notifyAll();
         }

-        if((code != TIMED_OUT) && (code != GENERATED_REJECTED_OVERLOAD) && 
(code != INTERNAL_ERROR)
-                       && (code != ROUTE_REALLY_NOT_FOUND)) {
-               Logger.minor(this, "SSK insert cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
-               (source == null ? node.localChkInsertBytesSentAverage : 
node.remoteChkInsertBytesSentAverage)
-                               .report(getTotalSentBytes());
-               (source == null ? node.localChkInsertBytesReceivedAverage : 
node.remoteChkInsertBytesReceivedAverage)
-                               .report(getTotalReceivedBytes());
-        }
-        
         Logger.minor(this, "Set status code: "+getStatusString());
         // Nothing to wait for, no downstream transfers, just exit.
     }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-10 23:27:27 UTC (rev 
9552)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-10 23:35:54 UTC (rev 
9553)
@@ -18,12 +18,12 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 870;
+       private static final int buildNumber = 871;

        /** Oldest build of Fred we will talk to */
        private static final int oldLastGoodBuild = 844;
-       private static final int newLastGoodBuild = 868;
-       private static final long transitionTime = 1152410400000L; // 2:00 GMT 
9/07/06
+       private static final int newLastGoodBuild = 871;
+       private static final long transitionTime = 1153094400000L; // 0:00 GMT 
17/07/06

        public static final int buildNumber() {
                return buildNumber;


Reply via email to