Author: robert
Date: 2008-01-21 19:17:14 +0000 (Mon, 21 Jan 2008)
New Revision: 17190

Modified:
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
use callback for requestSender status; now only 1 thread per-request (rather 
than 2)


Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-01-21 
19:10:35 UTC (rev 17189)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-01-21 
19:17:14 UTC (rev 17190)
@@ -63,6 +63,8 @@
        final DoubleTokenBucket _masterThrottle;
        final ByteCounter _ctr;
        final int PACKET_SIZE;
+       private boolean asyncExitStatus;
+       private boolean asyncExitStatusSet;

        public BlockTransmitter(MessageCore usm, PeerContext destination, long 
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle, 
ByteCounter ctr) {
                _usm = usm;
@@ -289,7 +291,16 @@
         */
        public void sendAsync(final Executor executor) {
                executor.execute(new Runnable() {
-                       public void run() { send(executor); } },
+                       public void run() {
+                                                try {
+                                                   
asyncExitStatus=send(executor);
+                                                } finally {
+                                                   synchronized 
(BlockTransmitter.this) {
+                                                      asyncExitStatusSet=true;
+                                                      
BlockTransmitter.this.notifyAll();
+                                                   }
+                                                }
+                                       } },
                        "BlockTransmitter:sendAsync() for "+this);
        }

@@ -304,6 +315,19 @@
                        }
                }
        }
+       
+       public boolean getAsyncExitStatus() {
+               synchronized (this) {
+                       while (!asyncExitStatusSet) {
+                               try {
+                                       this.wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       //ignore
+                               }
+                       }
+               }
+               return asyncExitStatus;
+       }

        public PeerContext getDestination() {
                return _destination;

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-01-21 19:10:35 UTC 
(rev 17189)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-01-21 19:17:14 UTC 
(rev 17190)
@@ -28,7 +28,7 @@
  * is separated off into RequestSender so we get transfer coalescing
  * and both ends for free. 
  */
-public class RequestHandler implements Runnable, ByteCounter {
+public class RequestHandler implements Runnable, ByteCounter, 
RequestSender.Listener {

        private static boolean logMINOR;
     final Message req;
@@ -45,6 +45,10 @@
     private RequestSender rs;
     private int status = RequestSender.NOT_FINISHED;
        private boolean appliedByteCounts=false;
+       private boolean sentRejectedOverload = false;
+       private long searchStartTime;
+       private long responseDeadline;
+       private BlockTransmitter bt;    

     public String toString() {
         return super.toString()+" for "+uid;
@@ -76,12 +80,14 @@
            freenet.support.Logger.OSThread.logPID(this);
         try {
                realRun();
+                       //The last thing that realRun() does is register as a 
request-sender listener, so any exception here is the end.
         } catch (NotConnectedException e) {
-               // Ignore, normal
+               Logger.normal(this, "requestor gone, could not start request 
handler wait");
+                       node.removeTransferringRequestHandler(uid);
+            node.unlockUID(uid, key instanceof NodeSSK, false, false);
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
-        } finally {
-               node.removeTransferringRequestHandler(uid);
+                       node.removeTransferringRequestHandler(uid);
             node.unlockUID(uid, key instanceof NodeSSK, false, false);
         }
     }
@@ -150,26 +156,16 @@
             return;
         }

-        boolean shouldHaveStartedTransfer = false;
-        boolean sentRejectedOverload = false;
+        //If we cannot respond before this time, the 'source' node has already 
fatally timed out (and we need not return packets which will not be claimed)
+               searchStartTime = System.currentTimeMillis();
+               responseDeadline = searchStartTime + 
RequestSender.FETCH_TIMEOUT + source.getProbableSendQueueTime();
+        
+        rs.addListener(this);
+       }

-               //If we cannot respond before this time, the 'source' node has 
already fatally timed out (and we need not return packets which will not be 
claimed)
-               long searchStartTime = System.currentTimeMillis();
-               long responseDeadline = searchStartTime + 
RequestSender.FETCH_TIMEOUT + source.getProbableSendQueueTime();
-        short waitStatus = 0;
-        
-        while(true) {
-            
-               waitStatus = rs.waitUntilStatusChange(waitStatus);
-                       long now = System.currentTimeMillis();
-                       
-                       if (now > responseDeadline) {
-                               Logger.error(this, "requestsender took too long 
to respond to requestor ("+TimeUtil.formatTime((now - searchStartTime), 2, 
true)+"/"+rs.getStatus()+")"); 
-                               applyByteCounts();
-                               return;
-                       }
-                       
-            if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0 && 
!sentRejectedOverload) {
+       public void onReceivedRejectOverload() {
+               try {
+            if(!sentRejectedOverload) {
                // Forward RejectedOverload
                                //Note: This message is only decernable from 
the terminal messages by the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
                Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -177,17 +173,30 @@
                                //If the status changes (e.g. to SUCCESS), 
there is little need to send yet another reject overload.
                                sentRejectedOverload=true;
             }
-            
-            if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
+               } catch (NotConnectedException e) {
+                       Logger.normal(this, "requestor is gone, can't forward 
reject overload");
+               }
+       }
+       
+       public void onCHKTransferBegins() {
+               try {
                // Is a CHK.
                 Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
                 source.sendAsync(df, null, 0, this);

                 PartiallyReceivedBlock prb = rs.getPRB();
-               BlockTransmitter bt =
+               bt =
                    new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
                node.addTransferringRequestHandler(uid);
-               if(bt.send(node.executor)) {
+                       bt.sendAsync(node.executor);
+               } catch (NotConnectedException e) {
+                       Logger.normal(this, "requestor is gone, can't begin CHK 
transfer");
+               }
+       }
+       
+       private void waitAndFinishCHKTransfer() throws NotConnectedException {
+               if (logMINOR) Logger.minor(this, "Waiting for CHK transfer to 
finish");
+               if(bt.getAsyncExitStatus()) {
                                        status = rs.getStatus();
                                // Successful CHK transfer, maybe path fold
                                finishOpennetChecked();
@@ -196,14 +205,24 @@
                                        status = rs.getStatus();
                                        //for byte logging, since the block is 
the 'terminal' message.
                                        applyByteCounts();
+                                       unregisterRequestHandlerWithNode();
                                }
-                   return;
-            }
+       }
+       
+       public void onRequestSenderFinished(int status) {
+               long now = System.currentTimeMillis();
+               
+               if (now > responseDeadline) {
+                       Logger.error(this, "requestsender took too long to 
respond to requestor ("+TimeUtil.formatTime((now - searchStartTime), 2, 
true)+"/"+rs.getStatus()+")"); 
+                       applyByteCounts();
+                       unregisterRequestHandlerWithNode();
+                       return;
+               }
+       
+               if(status == RequestSender.NOT_FINISHED)
+                       Logger.error(this, "onFinished() but not finished?");

-            status = rs.getStatus();
-
-            if(status == RequestSender.NOT_FINISHED) continue;
-            
+               try {
             switch(status) {
                case RequestSender.NOT_FINISHED:
                case RequestSender.DATA_NOT_FOUND:
@@ -239,45 +258,56 @@
                                } else {
                                        sendTerminal(df);
                                }
-                               return;
                        } else {
-                               if(!rs.transferStarted()) {
+                               if(bt == null) {
                                        // Bug! This is impossible!
                                        Logger.error(this, "Status is SUCCESS 
but we never started a transfer on "+uid);
-                                       // Could be a wierd synchronization 
bug, but we don't want to wait forever, so treat it as overload.
+                                       // Obviously this node is confused, 
send a terminal reject to make sure the requestor is not waiting forever.
                            reject = DMT.createFNPRejectedOverload(uid, true);
                                sendTerminal(reject);
-                               return;
                                } else {
-                                       // Race condition. We need to go around 
the loop again and pick up the data transfer
-                                       // in waitStatus.
+                                       waitAndFinishCHKTransfer();
                                }
-                               // Either way, go back around the loop.
-                               continue;
                        }
+                                       return;
                case RequestSender.VERIFY_FAILURE:
                        if(key instanceof NodeCHK) {
-                               if(shouldHaveStartedTransfer)
-                                       throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
-                               shouldHaveStartedTransfer = true;
-                               continue; // should have started transfer
+                                               if(bt == null) {
+                                       // Bug! This is impossible!
+                                       Logger.error(this, "Status is 
VERIFY_FAILURE but we never started a transfer on "+uid);
+                                                       // Obviously this node 
is confused, send a terminal reject to make sure the requestor is not waiting 
forever.
+                           reject = DMT.createFNPRejectedOverload(uid, true);
+                               sendTerminal(reject);
+                               } else {
+                                                       //Verify fails after 
receive() is complete, so we might as well propagate it...
+                                       waitAndFinishCHKTransfer();
+                               }
+                                               return;
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
                        sendTerminal(reject);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
-                               if(shouldHaveStartedTransfer)
-                                       throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
-                               shouldHaveStartedTransfer = true;
-                               continue; // should have started transfer
+                               if(bt == null) {
+                                       // Bug! This is impossible!
+                                       Logger.error(this, "Status is 
TRANSFER_FAILED but we never started a transfer on "+uid);
+                                                       // Obviously this node 
is confused, send a terminal reject to make sure the requestor is not waiting 
forever.
+                           reject = DMT.createFNPRejectedOverload(uid, true);
+                               sendTerminal(reject);
+                               } else {
+                                       waitAndFinishCHKTransfer();
+                               }
+                                               return;
                        }
-                       // Other side knows, right?
+                       Logger.error(this, "finish(TRANSFER_FAILED) should not 
be called on SSK?!?!");
                        return;
                default:
                    throw new IllegalStateException("Unknown status code 
"+status);
             }
-        }
+               } catch (NotConnectedException e) {
+                       Logger.normal(this, "requestor is gone, can't send 
terminal message");
+               }
        }

     /**
@@ -313,10 +343,16 @@
                        } else {
                 //also for byte logging, since the block is the 'terminal' 
message.
                 applyByteCounts();
+                               unregisterRequestHandlerWithNode();
                }
         }
        }

+       private void unregisterRequestHandlerWithNode() {
+               node.removeTransferringRequestHandler(uid);
+               node.unlockUID(uid, key instanceof NodeSSK, false, false);
+       }
+       
        /**
      * Sends the 'final' packet of a request in such a way that the thread can 
be freed (made non-runnable/exit)
      * and the byte counter will still be accurate.
@@ -352,6 +388,7 @@
                public void sent() {
             //For byte counting, this relies on the fact that the callback 
will only be excuted once.
                        applyByteCounts();
+                       unregisterRequestHandlerWithNode();
         }
        }

@@ -366,6 +403,7 @@
                                (node.passOpennetRefsThroughDarknet() || 
source.isOpennet()) &&
                   finishOpennetInner(om)) {
                        applyByteCounts();
+                       unregisterRequestHandlerWithNode();
                        return;
                }

@@ -383,6 +421,7 @@
                if(om != null && (source.isOpennet() || 
node.passOpennetRefsThroughDarknet()) &&
                   finishOpennetNoRelayInner(om)) {
                        applyByteCounts();
+                       unregisterRequestHandlerWithNode();
                        return;
                }


Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2008-01-21 19:10:35 UTC 
(rev 17189)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2008-01-21 19:17:14 UTC 
(rev 17190)
@@ -3,7 +3,9 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;

 import freenet.crypt.CryptFormatException;
 import freenet.crypt.DSAPublicKey;
@@ -70,6 +72,8 @@
     private byte[] sskData;
     private SSKBlock block;
     private boolean hasForwarded;
+       
+       private ArrayList listeners=new ArrayList();

     // Terminal status
     // Always set finished AFTER setting the reason flag
@@ -433,7 +437,8 @@
                                synchronized(this) {
                                        notifyAll();
                                }
-                               
+                               fireCHKTransferBegins();
+                                               
                                BlockReceiver br = new BlockReceiver(node.usm, 
next, uid, prb, this);

                                try {
@@ -580,10 +585,13 @@
        private volatile boolean hasForwardedRejectedOverload;

     /** Forward RejectedOverload to the request originator */
-    private synchronized void forwardRejectedOverload() {
-       if(hasForwardedRejectedOverload) return;
-       hasForwardedRejectedOverload = true;
-               notifyAll();
+    private void forwardRejectedOverload() {
+               synchronized (this) {
+                       if(hasForwardedRejectedOverload) return;
+                       hasForwardedRejectedOverload = true;
+                       notifyAll();
+               }
+               fireReceivedRejectOverload();
        }

     public PartiallyReceivedBlock getPRB() {
@@ -659,20 +667,27 @@
             if(status == SUCCESS)
                successFrom = next;
         }
-        
+               
         if(status == SUCCESS) {
                if(next != null) {
                        next.onSuccess(false, key instanceof NodeSSK);
                }
                node.nodeStats.requestCompleted(true, source != null, key 
instanceof NodeSSK);

+                       //NOTE: because of the requesthandler implementation, 
this will block and wait
+                       //      for downstream transfers on a CHK. The opennet 
stuff introduces
+                       //      a delay of it's own if we don't get the 
expected message.
+                       fireRequestSenderFinished(code);
+                       
                if(key instanceof NodeCHK && next != null && 
                                (next.isOpennet() || 
node.passOpennetRefsThroughDarknet()) ) {
                        finishOpennet(next);
                } else
                        finishOpennetNull(next);
-        } else
+        } else {
                node.nodeStats.requestCompleted(false, source != null, key 
instanceof NodeSSK);
+                       fireRequestSenderFinished(code);
+               }

                synchronized(this) {
                        opennetFinished = true;
@@ -852,4 +867,72 @@
        public boolean isLocalRequestSearch() {
                return (source==null);
        }
+       
+       interface Listener {
+               void onReceivedRejectOverload();
+               void onCHKTransferBegins();
+               void onRequestSenderFinished(int status);
+       }
+       
+       public void addListener(Listener l) {
+               boolean reject=false;
+               boolean transfer=false;
+               int status;
+               synchronized (this) {
+                       synchronized (listeners) {
+                               listeners.add(l);
+                       }
+                       reject=hasForwardedRejectedOverload;
+                       transfer=transferStarted();
+                       status=this.status;
+               }
+               if (reject)
+                       l.onReceivedRejectOverload();
+               if (transfer)
+                       l.onCHKTransferBegins();
+               if (status!=NOT_FINISHED)
+                       l.onRequestSenderFinished(status);
+       }
+       
+       private void fireReceivedRejectOverload() {
+               synchronized (listeners) {
+                       Iterator i=listeners.iterator();
+                       while (i.hasNext()) {
+                               Listener l=(Listener)i.next();
+                               try {
+                                       l.onReceivedRejectOverload();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught: "+t, t);
+                               }
+                       }
+               }
+       }
+       
+       private void fireCHKTransferBegins() {
+               synchronized (listeners) {
+                       Iterator i=listeners.iterator();
+                       while (i.hasNext()) {
+                               Listener l=(Listener)i.next();
+                               try {
+                                       l.onCHKTransferBegins();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught: "+t, t);
+                               }
+                       }
+               }
+       }
+       
+       private void fireRequestSenderFinished(int status) {
+               synchronized (listeners) {
+                       Iterator i=listeners.iterator();
+                       while (i.hasNext()) {
+                               Listener l=(Listener)i.next();
+                               try {
+                                       l.onRequestSenderFinished(status);
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught: "+t, t);
+                               }
+                       }
+               }
+       }
 }


Reply via email to