Author: robert
Date: 2007-12-06 20:24:08 +0000 (Thu, 06 Dec 2007)
New Revision: 16372

Modified:
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/RequestHandler.java
Log:
Don't hang onto a RequestHandler thread just to track the bytes.


Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2007-12-06 19:40:42 UTC 
(rev 16371)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2007-12-06 20:24:08 UTC 
(rev 16372)
@@ -54,6 +54,7 @@
     }

        public void onSent(int length) {
+        //NB: The fact that the bytes are counted before callback 
notifications is important for load management.
                if(ctrCallback != null) {
                        try {
                                ctrCallback.sentBytes(length);

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2007-12-06 19:40:42 UTC 
(rev 16371)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2007-12-06 20:24:08 UTC 
(rev 16372)
@@ -4,6 +4,7 @@
 package freenet.node;

 import freenet.crypt.DSAPublicKey;
+import freenet.io.comm.AsyncMessageCallback;
 import freenet.io.comm.ByteCounter;
 import freenet.io.comm.DMT;
 import freenet.io.comm.Message;
@@ -82,8 +83,12 @@
         } finally {
                node.removeTransferringRequestHandler(uid);
             node.unlockUID(uid, key instanceof NodeSSK, false, false);
-            if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
-                       && status != RequestSender.INTERNAL_ERROR && !thrown) {
+        }
+    }
+    
+    private void applyByteCounts() {
+        if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
+           && status != RequestSender.INTERNAL_ERROR) {
                int sent, rcvd;
                synchronized(bytesSync) {
                        sent = sentBytes;
@@ -99,6 +104,7 @@
                                // Can report both parts, because we had both a 
Handler and a Sender
                                
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
                                
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(rcvd);
+                        node.sentPayload(rs.getSSKData().length); // won't be 
sentPayload()ed by BlockTransmitter
                        }
                } else {
                        if(logMINOR) Logger.minor(this, "Remote CHK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
@@ -111,8 +117,6 @@
                        }
                }
             }
-
-        }
     }

     private void realRun() throws NotConnectedException {
@@ -127,13 +131,13 @@
         if(o instanceof KeyBlock) {
             KeyBlock block = (KeyBlock) o;
             Message df = createDataFound(block);
-            source.sendSync(df, null);
+            source.sendAsync(df, null, 0, this);
             if(key instanceof NodeSSK) {
                 if(needsPubKey) {
                        DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
                        Message pk = DMT.createFNPSSKPubKey(uid, key);
                        if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' ' 
+key.toLongString());
-                       source.sendSync(pk, null);
+                       sendTerminal(pk);
                 }
                 status = RequestSender.SUCCESS; // for byte logging
             }
@@ -144,11 +148,14 @@
                        new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
                node.addTransferringRequestHandler(uid);
                if(bt.send(node.executor)) {
-                       status = RequestSender.SUCCESS; // for byte logging
+                    // for byte logging
+                       status = RequestSender.SUCCESS;
                        // We've fetched it from our datastore, so there won't 
be a downstream noderef.
                        // But we want to send at least an 
FNPOpennetCompletedAck, otherwise the request source
                        // may have to timeout waiting for one.
                                finishOpennetNoRelay();
+                    //also for byte logging, since the block is the 'terminal' 
message.
+                    applyByteCounts();
                }
             }
             return;
@@ -157,8 +164,8 @@

         if(rs == null) { // ran out of htl?
             Message dnf = DMT.createFNPDataNotFound(uid);
-            source.sendSync(dnf, null);
             status = RequestSender.DATA_NOT_FOUND; // for byte logging
+            sendTerminal(dnf);
             return;
         }

@@ -172,13 +179,14 @@
             if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
                // Forward RejectedOverload
                Message msg = DMT.createFNPRejectedOverload(uid, false);
-               source.sendAsync(msg, null, 0, null);
+               source.sendAsync(msg, null, 0, this);
             }

             if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
                // Is a CHK.
                 Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
-                source.sendSync(df, null);
+                source.sendAsync(df, null, 0, this);
+                
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
                    new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
@@ -190,6 +198,8 @@
                                finishOpennetChecked();
                }
                                status = rs.getStatus();
+                //for byte logging, since the block is the 'terminal' message.
+                applyByteCounts();
                    return;
             }

@@ -201,11 +211,11 @@
                case RequestSender.NOT_FINISHED:
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.sendSync(dnf, this);
+                       sendTerminal(dnf);
                        return;
                case RequestSender.RECENTLY_FAILED:
                        Message rf = DMT.createFNPRecentlyFailed(uid, 
rs.getRecentlyFailedTimeLeft());
-                       source.sendSync(rf, this);
+                       sendTerminal(rf);
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -213,21 +223,22 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendSync(reject, this);
+                       sendTerminal(reject);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.sendSync(rnf, this);
+                       sendTerminal(rnf);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
                         Message df = DMT.createFNPSSKDataFound(uid, 
rs.getHeaders(), rs.getSSKData());
-                        source.sendSync(df, this);
-                        node.sentPayload(rs.getSSKData().length); // won't be 
sentPayload()ed by BlockTransmitter
                         if(needsPubKey) {
+                            source.sendAsync(df, null, 0, this);
                                Message pk = DMT.createFNPSSKPubKey(uid, 
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey());
-                               source.sendSync(pk, this);
+                               sendTerminal(pk);
+                        } else {
+                            sendTerminal(df);
                         }
                                return;
                        } else {
@@ -244,7 +255,7 @@
                                continue; // should have started transfer
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendSync(reject, this);
+                       sendTerminal(reject);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
@@ -262,6 +273,49 @@
        }

     /**
+     * Sends the 'final' packet of a request in such a way that the thread can 
be freed (made non-runnable/exit)
+     * and the byte counter will still be accurate.
+     */
+    private void sendTerminal(Message msg) throws NotConnectedException {
+        if (sendTerminalCalled)
+            throw new IllegalStateException("sendTerminal should only be 
called once");
+        else
+            sendTerminalCalled=true;
+        
+        source.sendAsync(msg, new TerminalMessageByteCountCollector(), 0, 
this);
+    }
+    
+    boolean sendTerminalCalled=false;
+    
+    /**
+     * Note well! These functions are not executed on the RequestHandler 
thread.
+     */
+    private class TerminalMessageByteCountCollector implements 
AsyncMessageCallback {
+        
+               public void acknowledged() {
+            //terminalMessage ack'd by remote peer
+               }
+        
+               public void disconnected() {
+            Logger.minor(this, "Peer disconnected before terminal message sent 
for "+RequestHandler.this);
+               }
+        
+               public void fatalError() {
+                       Logger.error(this, "Error sending terminal message?! 
for " + RequestHandler.this);
+               }
+        
+        private boolean once=true;
+               public void sent() {
+            //For byte counting, this relies on the fact that the callback 
will only be excuted once. This check might be paranoid.
+            if (once) {
+                applyByteCounts();
+            } else {
+                Logger.error(this, "terminalMessage sent multiple times? for " 
+ RequestHandler.this);
+            }
+        }
+       }
+    
+    /**
      * Either send an ack, indicating we've finished and aren't interested in 
opennet, 
      * or wait for a noderef and relay it and wait for a response and relay 
that,
      * or send our own noderef and wait for a response and add that.
@@ -406,6 +460,7 @@
                if(block instanceof CHKBlock)
                        return DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
                else if(block instanceof SSKBlock) {
+            // FIXME called before payload is actually sent
                        node.sentPayload(block.getRawData().length); // won't 
be sentPayload()ed by BlockTransmitter
                        return DMT.createFNPSSKDataFound(uid, 
block.getRawHeaders(), block.getRawData());
                } else
@@ -431,5 +486,5 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
-
+    
 }


Reply via email to