Author: nextgens
Date: 2007-03-28 22:12:30 +0000 (Wed, 28 Mar 2007)
New Revision: 12417

Removed:
   trunk/freenet/src/freenet/node/StatusChangeCallback.java
Modified:
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
Revert since r12409; my code isn't ready for a release ... And it seems it's 
time for #1021 ... (BlockTransmitter has to be refactored; we can't keep the 
callback on RequestHandler for too long: it hangs the PacketSender loop)

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-03-28 21:56:36 UTC 
(rev 12416)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-03-28 22:12:30 UTC 
(rev 12417)
@@ -168,8 +168,10 @@
                        return true;
                }
                //if(!node.lockUID(id)) return false;
-               RequestHandler rh = new RequestHandler(m, id, node); // Do we 
need to keep a record of in flight RHs?
-               rh.run();
+               RequestHandler rh = new RequestHandler(m, id, node);
+               Thread t = new Thread(rh, "RequestHandler for UID "+id);
+               t.setDaemon(true);
+               t.start();
                return true;
        }


Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2007-03-28 21:56:36 UTC 
(rev 12416)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2007-03-28 22:12:30 UTC 
(rev 12417)
@@ -21,13 +21,8 @@
  * is separated off into RequestSender so we get transfer coalescing
  * and both ends for free. 
  */
-public class RequestHandler implements ByteCounter, StatusChangeCallback {
+public class RequestHandler implements Runnable, ByteCounter {

-       private final static short INITIALIZE = 1;
-       private final static short WAIT_FOR_FIRST_REPLY = 2;
-       private final static short FINISHED = 3;
-       private short currentState = INITIALIZE;
-       
        private static boolean logMINOR;
     final Message req;
     final Node node;
@@ -40,10 +35,9 @@
     private boolean finalTransferFailed = false;
     final boolean resetClosestLoc;

-    private short waitStatus = 0;
-    private int status = RequestSender.NOT_FINISHED;
-    private boolean shouldHaveStartedTransfer = false;
-       private RequestSender rs = null;
+    public String toString() {
+        return super.toString()+" for "+uid;
+    }

     public RequestHandler(Message m, long id, Node n) {
         req = m;
@@ -66,73 +60,56 @@
     }

     public void run() {
+       int status = RequestSender.NOT_FINISHED;
+       RequestSender rs = null;
         try {
-               if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
-               htl = source.decrementHTL(htl);
-
-               Message accepted = DMT.createFNPAccepted(uid);
-               source.sendAsync(accepted, null, 0, null);
-
-               Object o = node.makeRequestSender(key, htl, uid, source, 
closestLoc, resetClosestLoc, false, true, false);
-               if(o instanceof KeyBlock) {
-                       KeyBlock block = (KeyBlock) o;
-                       Message df = createDataFound(block);
-               source.sendAsync(df, null, 0, null);                    
-                       if(key instanceof NodeSSK) {
-                               if(needsPubKey) {
-                                       DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
-                                       Message pk = 
DMT.createFNPSSKPubKey(uid, key.asBytes());
-                                       if(logMINOR) Logger.minor(this, 
"Sending PK: "+key+ ' ' +key.toLongString());
-                               source.sendAsync(pk, null, 0, null);
-                               }
-                               status = RequestSender.SUCCESS; // for byte 
logging
-                       }
-                       if(block instanceof CHKBlock) {
-                               PartiallyReceivedBlock prb =
-                                       new 
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, 
block.getRawData());
-                               BlockTransmitter bt =
-                                       new BlockTransmitter(node.usm, source, 
uid, prb, node.outputThrottle, this);
-                               node.addTransferringRequestHandler(uid);
-                               if(bt.send())
-                                       status = RequestSender.SUCCESS; // for 
byte logging
-                       }
-                       return;
-               }
-               rs = (RequestSender) o;
-
-               if(rs == null) { // ran out of htl?
-                       Message dnf = DMT.createFNPDataNotFound(uid);
-               source.sendAsync(dnf, null, 0, null);
-                       status = RequestSender.DATA_NOT_FOUND; // for byte 
logging
-                       return;
-               }
-               
-               synchronized (this) {
-                               currentState = WAIT_FOR_FIRST_REPLY;
-                       }
-               rs.callbackWhenStatusChange(this, waitStatus);
-
-        } catch (Throwable t) {
-               Logger.error(this, "Caught "+t, t);
-               _finally(); // Yes, we don't want the finally() block here 
anyway
-        } 
-    }
-    
-    public void onStatusChange(short mask) {
-       waitStatus = mask;
-       if(currentState == WAIT_FOR_FIRST_REPLY) {
-               waitForFirstReply();
-               synchronized (this) {
-                               currentState = FINISHED;
-                       }
-       }
-    }
+        if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
+        htl = source.decrementHTL(htl);

-    private void waitForFirstReply(){
-       synchronized (this) {
-                       if(currentState != WAIT_FOR_FIRST_REPLY) return;
-               }
-       try {
+        Message accepted = DMT.createFNPAccepted(uid);
+        source.send(accepted, null);
+        
+        Object o = node.makeRequestSender(key, htl, uid, source, closestLoc, 
resetClosestLoc, false, true, false);
+        if(o instanceof KeyBlock) {
+            KeyBlock block = (KeyBlock) o;
+            Message df = createDataFound(block);
+            source.send(df, null);
+            if(key instanceof NodeSSK) {
+                if(needsPubKey) {
+                       DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
+                       Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
+                       if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' ' 
+key.toLongString());
+                       source.send(pk, null);
+                }
+                status = RequestSender.SUCCESS; // for byte logging
+            }
+            if(block instanceof CHKBlock) {
+               PartiallyReceivedBlock prb =
+                       new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
+               BlockTransmitter bt =
+                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
+               node.addTransferringRequestHandler(uid);
+               if(bt.send())
+                       status = RequestSender.SUCCESS; // for byte logging
+            }
+            return;
+        }
+        rs = (RequestSender) o;
+        
+        if(rs == null) { // ran out of htl?
+            Message dnf = DMT.createFNPDataNotFound(uid);
+            source.send(dnf, null);
+            status = RequestSender.DATA_NOT_FOUND; // for byte logging
+            return;
+        }
+        
+        boolean shouldHaveStartedTransfer = false;
+        
+        short waitStatus = 0;
+        
+        while(true) {
+            
+               waitStatus = rs.waitUntilStatusChange(waitStatus);
             if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
                // Forward RejectedOverload
                Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -142,7 +119,7 @@
             if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
                // Is a CHK.
                 Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
-               source.sendAsync(df, null, 0, null);
+                source.send(df, null);
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
                    new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
@@ -155,13 +132,13 @@

             status = rs.getStatus();

-            if(status == RequestSender.NOT_FINISHED) 
rs.callbackWhenStatusChange(this, waitStatus);
+            if(status == RequestSender.NOT_FINISHED) continue;

             switch(status) {
                case RequestSender.NOT_FINISHED:
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.sendAsync(dnf, null, 0, null);                   
 
+                       source.send(dnf, this);
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -169,21 +146,21 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0, null);
+                       source.send(reject, this);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.sendAsync(rnf, null, 0, null);
+                       source.send(rnf, this);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
                         Message df = DMT.createFNPSSKDataFound(uid, 
rs.getHeaders(), rs.getSSKData());
-                       source.sendAsync(df, null, 0, null);
+                        source.send(df, this);
                         node.sentPayload(rs.getSSKData().length);
                         if(needsPubKey) {
                                Message pk = DMT.createFNPSSKPubKey(uid, 
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
-                               source.sendAsync(pk, null, 0, null);
+                               source.send(pk, this);
                         }
                        } else if(!rs.transferStarted()) {
                                Logger.error(this, "Status is SUCCESS but we 
never started a transfer on "+uid);
@@ -194,60 +171,55 @@
                                if(shouldHaveStartedTransfer)
                                        throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
                                shouldHaveStartedTransfer = true;
-                               rs.callbackWhenStatusChange(this, waitStatus);
+                               continue; // should have started transfer
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0, null);
+                       source.send(reject, this);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
                                if(shouldHaveStartedTransfer)
                                        throw new IllegalStateException("Got 
status code "+status+" but transfer not started");
                                shouldHaveStartedTransfer = true;
-                               rs.callbackWhenStatusChange(this, waitStatus);
+                               continue; // should have started transfer
                        }
                        // Other side knows, right?
                        return;
                default:
                    throw new IllegalStateException("Unknown status code 
"+status);
             }
+        }
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
         } finally {
-               _finally();
+               node.removeTransferringRequestHandler(uid);
+            node.unlockUID(uid, key instanceof NodeSSK, false);
+            if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
+                       && status != RequestSender.INTERNAL_ERROR) {
+               int sent = rs.getTotalSentBytes() + sentBytes;
+               int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+               if(key instanceof NodeSSK) {
+                       if(logMINOR) Logger.minor(this, "Remote SSK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+                       
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
+                       
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
+                       if(status == RequestSender.SUCCESS) {
+                               
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
+                               
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
+                       }
+               } else {
+                       if(logMINOR) Logger.minor(this, "Remote CHK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+                       
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
+                       
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
+                       if(status == RequestSender.SUCCESS) {
+                               
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
+                               
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
+                       }
+               }
+            }
+
         }
     }

-       private synchronized void _finally() {
-               if(logMINOR && (currentState != WAIT_FOR_FIRST_REPLY))
-                       Logger.minor(this, "Hmm, wtf ?");
-               currentState = FINISHED;
-               node.removeTransferringRequestHandler(uid);
-        node.unlockUID(uid, key instanceof NodeSSK, false);
-        if((!finalTransferFailed) && rs != null && status != 
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD 
-                       && status != RequestSender.INTERNAL_ERROR) {
-               int sent = rs.getTotalSentBytes() + sentBytes;
-               int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
-               if(key instanceof NodeSSK) {
-                       if(logMINOR) Logger.minor(this, "Remote SSK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-               node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
-               node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
-               if(status == RequestSender.SUCCESS) {
-                       
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
-                       
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
-               }
-               } else {
-                       if(logMINOR) Logger.minor(this, "Remote CHK fetch cost 
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-               node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
-               node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
-               if(status == RequestSender.SUCCESS) {
-                       
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
-                       
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
-               }
-               }
-        }
-       }
-
        private Message createDataFound(KeyBlock block) {
                if(block instanceof CHKBlock)
                        return DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
@@ -277,8 +249,5 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
-       
-    public String toString() {
-        return super.toString()+" for "+uid;
-    }
+
 }

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2007-03-28 21:56:36 UTC 
(rev 12416)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2007-03-28 22:12:30 UTC 
(rev 12417)
@@ -519,54 +519,7 @@
         }
     }

-
     /**
-     * Same as waitUntilStatusChange but non blocking
-     * @see waitUntilStatusChange(short mask)
-     */
-    public void callbackWhenStatusChange(final StatusChangeCallback cb, final 
short mask) {
-       
-       if(mask == WAIT_ALL) throw new IllegalArgumentException("Cannot ignore 
all!");
-       
-       final Runnable whenStatusChange = new Runnable(){
-               private boolean isRunning = false;
-               
-               public void run(){
-                       synchronized (this) {
-                                       if(isRunning) return;
-                                       isRunning = true;
-                               }
-                       _realRun();
-                       synchronized (this) {
-                               isRunning = false;      
-                               }
-               }
-               
-               private void _realRun() {
-                       short current;
-                       synchronized (cb) {
-                               current = mask; // If any bits are set already, 
we ignore those states.
-                               
-                               if(hasForwardedRejectedOverload)
-                                       current |= WAIT_REJECTED_OVERLOAD;
-                       
-                               if(prb != null)
-                                       current |= WAIT_TRANSFERRING_DATA;
-                       
-                       if(status != NOT_FINISHED)
-                               current |= WAIT_FINISHED;
-
-                       if(current == mask)
-                               node.ps.queueTimedJob(this, 10000);
-                       }
-               cb.onStatusChange(current);
-               }
-       };
-       
-       whenStatusChange.run();
-    }
-        
-    /**
      * Wait until we have a terminal status code.
      */
     public synchronized void waitUntilFinished() {

Deleted: trunk/freenet/src/freenet/node/StatusChangeCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/StatusChangeCallback.java    2007-03-28 
21:56:36 UTC (rev 12416)
+++ trunk/freenet/src/freenet/node/StatusChangeCallback.java    2007-03-28 
22:12:30 UTC (rev 12417)
@@ -1,8 +0,0 @@
-/* This code is part of Freenet. It is distributed under the GNU General
- * Public License, version 2 (or at your option any later version). See
- * http://www.gnu.org/ for further details of the GPL. */
-package freenet.node;
-
-public interface StatusChangeCallback {
-       public abstract void onStatusChange(short mask);
-}
\ No newline at end of file


Reply via email to