Author: nextgens
Date: 2007-03-28 12:29:21 +0000 (Wed, 28 Mar 2007)
New Revision: 12410

Added:
   trunk/freenet/src/freenet/node/StatusChangeCallback.java
Modified:
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
*!EXPERIMENTAL CODE!*

Try to spare one thread by transfered request (RequestHandler).
If it works it's cool, if it doesn't it might break the network...
Maybe the timeframe isn't good for such a commit; then revert it :)

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-03-28 01:49:26 UTC 
(rev 12409)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-03-28 12:29:21 UTC 
(rev 12410)
@@ -168,10 +168,8 @@
                        return true;
                }
                //if(!node.lockUID(id)) return false;
-               RequestHandler rh = new RequestHandler(m, id, node);
-               Thread t = new Thread(rh, "RequestHandler for UID "+id);
-               t.setDaemon(true);
-               t.start();
+               RequestHandler rh = new RequestHandler(m, id, node); // Do we 
need to keep a record of in flight RHs?
+               rh.run();
                return true;
        }


Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2007-03-28 01:49:26 UTC 
(rev 12409)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2007-03-28 12:29:21 UTC 
(rev 12410)
@@ -21,8 +21,13 @@
  * is separated off into RequestSender so we get transfer coalescing
  * and both ends for free. 
  */
-public class RequestHandler implements Runnable, ByteCounter {
+public class RequestHandler implements Runnable, ByteCounter, 
StatusChangeCallback {

+       private final static short INITIALIZE = 1;
+       private final static short WAIT_FOR_FIRST_REPLY = 2;
+       private final static short FINISHED = 3;
+       private short currentState = INITIALIZE;
+       
        private static boolean logMINOR;
     final Message req;
     final Node node;
@@ -35,9 +40,10 @@
     private boolean finalTransferFailed = false;
     final boolean resetClosestLoc;

-    public String toString() {
-        return super.toString()+" for "+uid;
-    }
+    private short waitStatus = 0;
+    private int status = RequestSender.NOT_FINISHED;
+    private boolean shouldHaveStartedTransfer = false;
+       private RequestSender rs = null;

     public RequestHandler(Message m, long id, Node n) {
         req = m;
@@ -60,56 +66,73 @@
     }

     public void run() {
-       int status = RequestSender.NOT_FINISHED;
-       RequestSender rs = null;
         try {
-        if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
-        htl = source.decrementHTL(htl);
+               if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
+               htl = source.decrementHTL(htl);
+
+               Message accepted = DMT.createFNPAccepted(uid);
+               source.sendAsync(accepted, null, 0, null);
+
+               Object o = node.makeRequestSender(key, htl, uid, source, 
closestLoc, resetClosestLoc, false, true, false);
+               if(o instanceof KeyBlock) {
+                       KeyBlock block = (KeyBlock) o;
+                       Message df = createDataFound(block);
+               source.sendAsync(df, null, 0, null);                    
+                       if(key instanceof NodeSSK) {
+                               if(needsPubKey) {
+                                       DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
+                                       Message pk = 
DMT.createFNPSSKPubKey(uid, key.asBytes());
+                                       if(logMINOR) Logger.minor(this, 
"Sending PK: "+key+ ' ' +key.toLongString());
+                               source.sendAsync(pk, null, 0, null);
+                               }
+                               status = RequestSender.SUCCESS; // for byte 
logging
+                       }
+                       if(block instanceof CHKBlock) {
+                               PartiallyReceivedBlock prb =
+                                       new 
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, 
block.getRawData());
+                               BlockTransmitter bt =
+                                       new BlockTransmitter(node.usm, source, 
uid, prb, node.outputThrottle, this);
+                               node.addTransferringRequestHandler(uid);
+                               if(bt.send())
+                                       status = RequestSender.SUCCESS; // for 
byte logging
+                       }
+                       return;
+               }
+               rs = (RequestSender) o;
+
+               if(rs == null) { // ran out of htl?
+                       Message dnf = DMT.createFNPDataNotFound(uid);
+               source.sendAsync(dnf, null, 0, null);
+                       status = RequestSender.DATA_NOT_FOUND; // for byte 
logging
+                       return;
+               }
+               
+               synchronized (this) {
+                               currentState = WAIT_FOR_FIRST_REPLY;
+                       }
+               rs.callbackWhenStatusChange(this, waitStatus);
+
+        } catch (Throwable t) {
+               Logger.error(this, "Caught "+t, t);
+               _finally(); // Yes, we don't want the finally() block here 
anyway
+        } 
+    }
+    
+    public void onStatusChange(short mask) {
+       waitStatus = mask;
+       if(currentState == WAIT_FOR_FIRST_REPLY) {
+               waitForFirstReply();
+               synchronized (this) {
+                               currentState = FINISHED;
+                       }
+       }
+    }

-        Message accepted = DMT.createFNPAccepted(uid);
-        source.send(accepted, null);
-        
-        Object o = node.makeRequestSender(key, htl, uid, source, closestLoc, 
resetClosestLoc, false, true, false);
-        if(o instanceof KeyBlock) {
-            KeyBlock block = (KeyBlock) o;
-            Message df = createDataFound(block);
-            source.send(df, null);
-            if(key instanceof NodeSSK) {
-                if(needsPubKey) {
-                       DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
-                       Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
-                       if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' ' 
+key.toLongString());
-                       source.send(pk, null);
-                }
-                status = RequestSender.SUCCESS; // for byte logging
-            }
-            if(block instanceof CHKBlock) {
-               PartiallyReceivedBlock prb =
-                       new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
-               BlockTransmitter bt =
-                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
-               node.addTransferringRequestHandler(uid);
-               if(bt.send())
-                       status = RequestSender.SUCCESS; // for byte logging
-            }
-            return;
-        }
-        rs = (RequestSender) o;
-        
-        if(rs == null) { // ran out of htl?
-            Message dnf = DMT.createFNPDataNotFound(uid);
-            source.send(dnf, null);
-            status = RequestSender.DATA_NOT_FOUND; // for byte logging
-            return;
-        }
-        
-        boolean shouldHaveStartedTransfer = false;
-        
-        short waitStatus = 0;
-        
-        while(true) {
-            
-               waitStatus = rs.waitUntilStatusChange(waitStatus);
+    private void waitForFirstReply(){
+       synchronized (this) {
+                       if(currentState != WAIT_FOR_FIRST_REPLY) return;
+               }
+       try {
             if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
                // Forward RejectedOverload
                Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -119,7 +142,7 @@
             if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
                // Is a CHK.
                 Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
-                source.send(df, null);
+               source.sendAsync(df, null, 0, null);
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
                    new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
@@ -132,13 +155,13 @@

             status = rs.getStatus();

-            if(status == RequestSender.NOT_FINISHED) continue;
+            if(status == RequestSender.NOT_FINISHED) 
rs.callbackWhenStatusChange(this, waitStatus);

             switch(status) {
                case RequestSender.NOT_FINISHED:
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.send(dnf, this);
+                       source.sendAsync(dnf, null, 0, null);                   
 
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -146,21 +169,21 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.send(reject, this);
+                       source.sendAsync(reject, null, 0, null);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.send(rnf, this);
+                       source.sendAsync(rnf, null, 0, null);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
                         Message df = DMT.createFNPSSKDataFound(uid, 
rs.getHeaders(), rs.getSSKData());
-                        source.send(df, this);
+                       source.sendAsync(df, null, 0, null);
                         node.sentPayload(rs.getSSKData().length);
                         if(needsPubKey) {
                                Message pk = DMT.createFNPSSKPubKey(uid, 
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
-                               source.send(pk, this);
+                               source.sendAsync(pk, null, 0, null);
                         }
                        } else if(!rs.transferStarted()) {
                                Logger.error(this, "Status is SUCCESS but we 
never started a transfer on "+uid);
@@ -171,55 +194,58 @@
                                if(shouldHaveStartedTransfer)
                                        throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
                                shouldHaveStartedTransfer = true;
-                               continue; // should have started transfer
+                               rs.callbackWhenStatusChange(this, waitStatus);
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.send(reject, this);
+                       source.sendAsync(reject, null, 0, null);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
                                if(shouldHaveStartedTransfer)
                                        throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
                                shouldHaveStartedTransfer = true;
-                               continue; // should have started transfer
+                               rs.callbackWhenStatusChange(this, waitStatus);
                        }
                        // Other side knows, right?
                        return;
                default:
                    throw new IllegalStateException("Unknown status code 
"+status);
             }
-        }
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
         } finally {
-               node.removeTransferringRequestHandler(uid);
-            node.unlockUID(uid, key instanceof NodeSSK, false);
-            if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
-                       && status != RequestSender.INTERNAL_ERROR) {
-               int sent = rs.getTotalSentBytes() + sentBytes;
-               int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
-               if(key instanceof NodeSSK) {
-                       if(logMINOR) Logger.minor(this, "Remote SSK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-                       
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
-                       
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
-                       if(status == RequestSender.SUCCESS) {
-                               
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
-                               
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
-                       }
-               } else {
-                       if(logMINOR) Logger.minor(this, "Remote CHK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-                       
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
-                       
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
-                       if(status == RequestSender.SUCCESS) {
-                               
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
-                               
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
-                       }
-               }
-            }
-
+               _finally();
         }
     }

+       private synchronized void _finally() {
+               currentState = FINISHED;
+               node.removeTransferringRequestHandler(uid);
+        node.unlockUID(uid, key instanceof NodeSSK, false);
+        if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
+                       && status != RequestSender.INTERNAL_ERROR) {
+               int sent = rs.getTotalSentBytes() + sentBytes;
+               int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+               if(key instanceof NodeSSK) {
+                       if(logMINOR) Logger.minor(this, "Remote SSK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+               node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
+               node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
+               if(status == RequestSender.SUCCESS) {
+                       
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
+                       
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
+               }
+               } else {
+                       if(logMINOR) Logger.minor(this, "Remote CHK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+               node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
+               node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
+               if(status == RequestSender.SUCCESS) {
+                       
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
+                       
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
+               }
+               }
+        }
+       }
+
        private Message createDataFound(KeyBlock block) {
                if(block instanceof CHKBlock)
                        return DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
@@ -249,5 +275,8 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
-
+       
+    public String toString() {
+        return super.toString()+" for "+uid;
+    }
 }

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2007-03-28 01:49:26 UTC 
(rev 12409)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2007-03-28 12:29:21 UTC 
(rev 12410)
@@ -519,7 +519,53 @@
         }
     }

+
     /**
+     * Same as waitUntilStatusChange but non blocking
+     * @see waitUntilStatusChange(short mask)
+     */
+    public void callbackWhenStatusChange(final StatusChangeCallback cb, final 
short mask) {
+       
+       if(mask == WAIT_ALL) throw new IllegalArgumentException("Cannot ignore 
all!");
+       
+       Runnable whenStatusChange = new Runnable(){
+               boolean isRunning = false;
+               
+               public void run(){
+                       synchronized (this) {
+                                       if(isRunning) return;
+                                       isRunning = true;
+                               }
+                       _realRun();
+                       isRunning = false;
+               }
+               
+               private void _realRun() {
+                       short current = mask; // If any bits are set already, 
we ignore those states.
+               
+                       synchronized (cb) {
+                               if(hasForwardedRejectedOverload)
+                                       current |= WAIT_REJECTED_OVERLOAD;
+                       
+                               if(prb != null)
+                                       current |= WAIT_TRANSFERRING_DATA;
+                       
+                       if(status != NOT_FINISHED)
+                               current |= WAIT_FINISHED;
+       
+                               }
+                       
+               if(current != mask)
+                       cb.onStatusChange(current);
+               else
+                       node.ps.queueTimedJob(this, 10000);
+               }
+       };
+       
+       whenStatusChange.run();
+    }
+        
+    /**
      * Wait until we have a terminal status code.
      */
     public synchronized void waitUntilFinished() {

Added: trunk/freenet/src/freenet/node/StatusChangeCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/StatusChangeCallback.java                    
        (rev 0)
+++ trunk/freenet/src/freenet/node/StatusChangeCallback.java    2007-03-28 
12:29:21 UTC (rev 12410)
@@ -0,0 +1,8 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+public interface StatusChangeCallback {
+       public abstract void onStatusChange(short mask);
+}
\ No newline at end of file


Reply via email to