Author: robert
Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
New Revision: 16733

Modified:
   trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
use message callback rather than hanging onto thread


Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 19:10:21 UTC 
(rev 16732)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 19:31:05 UTC 
(rev 16733)
@@ -6,6 +6,7 @@
 import java.util.HashSet;
 import java.util.Vector;

+import freenet.io.comm.AsyncMessageFilterCallback;
 import freenet.io.comm.ByteCounter;
 import freenet.io.comm.DMT;
 import freenet.io.comm.DisconnectedException;
@@ -24,7 +25,7 @@

 public final class CHKInsertSender implements Runnable, AnyInsertSender, 
ByteCounter {

-       private class BackgroundTransfer implements Runnable {
+       private class BackgroundTransfer implements Runnable, 
AsyncMessageFilterCallback {
                /** Node we are waiting for response from */
                final PeerNode pn;
                /** We may be sending data to that node */
@@ -42,6 +43,8 @@
                /** Did it succeed? */
                boolean transferSucceeded;

+               long transferCompletedTime;
+               
                BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
                        this.pn = pn;
                        bt = new BlockTransmitter(node.usm, pn, uid, prb, 
node.outputThrottle, CHKInsertSender.this);
@@ -56,7 +59,13 @@
                        try {
                                bt.send(node.executor);
                                
this.completedTransfer(bt.failedDueToOverload());
-                               
this.receivedNotice(waitForReceivedNotification(this));
+                               if (pn.isRoutable() && transferSucceeded) {
+                                       //synch-version: 
this.receivedNotice(waitForReceivedNotification(this));
+                                       //Add ourselves as a listener for the 
longterm completion message of this transfer, then gracefully exit.
+                                       
node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
+                               } else {
+                                       this.receivedNotice(false);
+                               }
                        } catch (Throwable t) {
                                this.completedTransfer(false);
                                this.receivedNotice(false);
@@ -64,10 +73,11 @@
                        }
                }

-               void completedTransfer(boolean success) {
+               private void completedTransfer(boolean success) {
                        synchronized(this) {
                                transferSucceeded = success;
                                completedTransfer = true;
+                               transferCompletedTime = 
System.currentTimeMillis();
                                notifyAll();
                        }
                        synchronized(backgroundTransfers) {
@@ -78,11 +88,15 @@
                        }
                }

-               void receivedNotice(boolean success) {
+               private void receivedNotice(boolean success) {
                        synchronized(this) {
+                               if (receivedCompletionNotice) {
+                                       Logger.error(this, 
"receivedNotice("+success+"), already had 
receivedNotice("+completionSucceeded+")");
+                               } else {
                                completionSucceeded = success;
                                receivedCompletionNotice = true;
                                notifyAll();
+                               }
                        }
                        synchronized(backgroundTransfers) {
                                backgroundTransfers.notifyAll();
@@ -92,6 +106,39 @@
                        }                       
                }

+               public void onMatched(Message m) {
+                       PeerNode pn = (PeerNode) m.getSource();
+                       // pn cannot be null, because the filters will prevent 
garbage collection of the nodes
+                       
+                       if(this.pn.equals(pn)) {
+                               boolean anyTimedOut = 
m.getBoolean(DMT.ANY_TIMED_OUT);
+                               if(anyTimedOut) {
+                                       
CHKInsertSender.this.setTransferTimedOut();
+                               }
+                               receivedNotice(!anyTimedOut);
+                       } else {
+                               Logger.error(this, "received completion notice 
for wrong node: "+pn+" != "+this.pn);
+                       }                       
+               }
+               
+               public boolean shouldTimeout() {
+                       //AFIACS, this will still let the filter timeout, but 
not call onMatched() twice.
+                       return receivedCompletionNotice;
+               }
+               
+               private MessageFilter getNotificationMessageFilter() {
+                       return MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
+               }
+               
+               boolean isTimedOut() {
+                       return 
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
+               }
+               
+               public void maybeTimedOut() {
+                       if (isTimedOut()) {
+                               receivedNotice(false);
+                       }
+               }
        }

        CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
@@ -651,83 +698,7 @@
                                }
                        }
                }
-       
-       /**
-        * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
-        * the local block transfer is complete, as it is neccesary to include 
the rount-trip-time in the allTransfersComplete()
-        * function.
-        * Returns true if received a successful notification of the downstream 
reception, false in every other case
-        * (e.g. timeout, cancel, receiveFailed, etc).
-        */
-       private boolean waitForReceivedNotification(BackgroundTransfer awc) {
-                               
-                       long transfersCompletedTime = 
System.currentTimeMillis();
-                       
-                       // Wait for acknowledgements from each node, or 
timeouts.
-                       
-                       while(true) {
-                               
-                               synchronized(backgroundTransfers) {
-                                       if(receiveFailed) return false;
-                               }
-                               // First calculate the timeout
-                               int timeout;
-                               long now = System.currentTimeMillis();
-                               timeout = (int)Math.min(Integer.MAX_VALUE, 
(transfersCompletedTime + TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
-                               if(timeout <= 0) {
-                                               Logger.error(this, "Timed out 
waiting for transfers to complete on "+uid);
-                                               setTransferTimedOut();
-                                       return false;
-                               }
-                               
-                                       // If disconnected, ignore.
-                                       if(!awc.pn.isRoutable()) {
-                                               Logger.normal(this, 
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
-                                               return false;
-                                       }
-                                       // If transfer failed, probably won't 
be acknowledged.
-                                       if(!awc.transferSucceeded) {
-                                               if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> false");
-                                               return false;
-                                       }
-                                       // See if redundant.
-                                       if(awc.receivedCompletionNotice) {
-                                               return awc.completionSucceeded;
-                                       }
-                               
-                               MessageFilter mf =
-                                                       
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);

-                                               if(logMINOR) Logger.minor(this, 
"Waiting for "+awc.pn.getPeer());
-                               
-                                       Message m;
-                                       try {
-                                               m = node.usm.waitFor(mf, 
CHKInsertSender.this);
-                                       } catch (DisconnectedException e) {
-                                               Logger.normal(this, 
"Disconnected (on waitFor): "+awc.pn+" in "+this);
-                                               return false;
-                                       }
-                                       if(m == null) {
-                                               Logger.error(this, "Timed out 
waiting for a final ack from: "+awc.pn);
-                                               return false;
-                                       } else {
-                                               PeerNode pn = (PeerNode) 
m.getSource();
-                                               // pn cannot be null, because 
the filters will prevent garbage collection of the nodes
-
-                                                       if(awc.pn.equals(pn)) {
-                                                               boolean 
anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
-                                                               if(anyTimedOut) 
{
-                                                                       
setTransferTimedOut();
-                                                               }
-                                                               return 
!anyTimedOut;
-                                                       } else {
-                                                               
Logger.error(this, "received completion notice for wrong node: "+awc);
-                                                               continue;
-                                                       }
-                                       }
-                               }
-               }
-
                /**
                 * Block until all transfers have reached a final-terminal 
state (success/failure). On success this means that a
                 * successful 'received-notification' has been received.
@@ -751,6 +722,7 @@
                                                completedTransfers = false;
                                                break;
                                        }
+                                       transfers[i].maybeTimedOut();
                                        if 
(!transfers[i].receivedCompletionNotice) {
                                                //must wait
                                                completedNotifications = false;


Reply via email to