Author: robert
Date: 2007-12-19 17:27:36 +0000 (Wed, 19 Dec 2007)
New Revision: 16725

Modified:
   trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
Wait for transfer ack's from background transfers independently (might catch 
more)


Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 16:44:52 UTC 
(rev 16724)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 17:27:36 UTC 
(rev 16725)
@@ -56,8 +56,10 @@
                        try {
                                bt.send(node.executor);
                                
this.completedTransfer(bt.failedDueToOverload());
+                               
this.receivedNotice(waitForReceivedNotification(this));
                        } catch (Throwable t) {
                                this.completedTransfer(false);
+                               this.receivedNotice(false);
                                Logger.error(this, "Caught "+t, t);
                        }
                }
@@ -617,12 +619,12 @@
                return sentRequest;
        }

-               public void waitForBackgroundTransferCompletions() {
+               private void waitForBackgroundTransferCompletions() {
                        try {
                    freenet.support.Logger.OSThread.logPID(this);
                        if(logMINOR) Logger.minor(this, "Starting "+this);

-                       // We are presently at a terminal stage.
+                       // We must presently be at such a stage that no more 
background transfers will be added.

                        BackgroundTransfer[] transfers;
                        synchronized(backgroundTransfers) {
@@ -631,10 +633,26 @@
                        }

                        // Wait for the outgoing transfers to complete.
-                       if(!waitForCompletedTransfers(transfers)) {
+                       if(!waitForBackgroundTransfers(transfers)) {
                                setTransferTimedOut();
                                return;
                        }
+                       } finally {
+                               synchronized(CHKInsertSender.this) {
+                                       allTransfersCompleted = true;
+                                       CHKInsertSender.this.notifyAll();
+                               }
+                       }
+               }
+       
+       /**
+        * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
+        * the local block transfer is complete, as it is neccesary to include 
the rount-trip-time in the allTransfersComplete()
+        * function.
+        * Returns true if received a successful notification of the downstream 
reception, false in every other case
+        * (e.g. timeout, cancel, receiveFailed, etc).
+        */
+       private boolean waitForReceivedNotification(BackgroundTransfer awc) {

                        long transfersCompletedTime = 
System.currentTimeMillis();

@@ -643,7 +661,7 @@
                        while(true) {

                                synchronized(backgroundTransfers) {
-                                       if(receiveFailed) return;
+                                       if(receiveFailed) return false;
                                }
                                // First calculate the timeout
                                int timeout;
@@ -652,104 +670,92 @@
                                if(timeout <= 0) {
                                                Logger.error(this, "Timed out 
waiting for transfers to complete on "+uid);
                                                setTransferTimedOut();
-                                       return;
+                                       return false;
                                }

-                               MessageFilter mf = null;
-                               
-                               //Build a message filter to capture 
acknowledgement messages from the nodes we are interested in.
-                               for(int i=0;i<transfers.length;i++) {
-                                       BackgroundTransfer awc = transfers[i];
                                        // If disconnected, ignore.
                                        if(!awc.pn.isRoutable()) {
                                                Logger.normal(this, 
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
-                                               continue;
+                                               return false;
                                        }
                                        // If transfer failed, probably won't 
be acknowledged.
                                        if(!awc.transferSucceeded) {
-                                               continue;
+                                               if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> false");
+                                               return false;
                                        }
-                                       // Wait for completion.
-                                       if(!awc.receivedCompletionNotice) {
-                                               MessageFilter m =
+                                       // See if redundant.
+                                       if(awc.receivedCompletionNotice) {
+                                               return awc.completionSucceeded;
+                                       }
+                               
+                               MessageFilter mf =
                                                        
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
-                                               if(mf == null)
-                                                       mf = m;
-                                               else
-                                                       mf = m.or(mf);
+
                                                if(logMINOR) Logger.minor(this, 
"Waiting for "+awc.pn.getPeer());
-                                       }
-                               }

-                               if (mf==null) {
-                                       if (logMINOR) Logger.minor(this, "Done 
waiting, no more completion listeners");
-                                       return;
-                               } else {
                                        Message m;
                                        try {
                                                m = node.usm.waitFor(mf, 
CHKInsertSender.this);
                                        } catch (DisconnectedException e) {
-                                               // Which one? I have no idea.
-                                               // Go around the loop again to 
find out.
-                                               continue;
+                                               Logger.normal(this, 
"Disconnected (on waitFor): "+awc.pn+" in "+this);
+                                               return false;
                                        }
                                        if(m == null) {
-                                               Logger.error(this, "Timed out 
waiting for a final ack from any nodes.");
-                                               //Would looping again help? We 
could either:
-                                               // (1) loop and notice that 
there is no more time left (handling the timeout), or
-                                               // (2) notice that the nodes we 
are waiting on are down and possibly exit gracefully (not implemented).
-                                               continue;
+                                               Logger.error(this, "Timed out 
waiting for a final ack from: "+awc.pn);
+                                               return false;
                                        } else {
-                                               // Process message
                                                PeerNode pn = (PeerNode) 
m.getSource();
                                                // pn cannot be null, because 
the filters will prevent garbage collection of the nodes
-                                               boolean processed = false;
-                                               for(int 
i=0;i<transfers.length;i++) {
-                                                       PeerNode p = 
transfers[i].pn;
-                                                       if(p == pn) {
+
+                                                       if(awc.pn.equals(pn)) {
                                                                boolean 
anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
-                                                               
transfers[i].receivedNotice(!anyTimedOut);
                                                                if(anyTimedOut) 
{
                                                                        
setTransferTimedOut();
                                                                }
-                                                               processed = 
true;
-                                                               break;
+                                                               return 
!anyTimedOut;
+                                                       } else {
+                                                               
Logger.error(this, "received completion notice for wrong node: "+awc);
+                                                               continue;
                                                        }
-                                               }
-                                               if(!processed) {
-                                                       Logger.error(this, "Did 
not process message: "+m+" on "+this);
-                                               }
                                        }
                                }
-                       }
-                       } finally {
-                               synchronized(CHKInsertSender.this) {
-                                       allTransfersCompleted = true;
-                                       CHKInsertSender.this.notifyAll();
-                               }
-                       }
                }

-               /** Block until all transfers have finished. @return True if 
there is any point in waiting for acknowledgements. */
-               private boolean waitForCompletedTransfers(BackgroundTransfer[] 
transfers) {
+               /**
+                * Block until all transfers have reached a final-terminal 
state (success/failure). On success this means that a
+                * successful 'received-notification' has been received.
+                * @return True if all background transfers were successful.
+                */
+               private boolean waitForBackgroundTransfers(BackgroundTransfer[] 
transfers) {
                        // MAYBE all done
                        while(true) {
+                               //If we want to be sure to exit as-soon-as the 
transfers are done, then we must hold the lock while we check.
+                               synchronized(backgroundTransfers) {
+                                       if(receiveFailed) return false;
+
                                boolean noneRouteable = true;
                                boolean completedTransfers = true;
+                               boolean completedNotifications = true;
                                for(int i=0;i<transfers.length;i++) {
                                        if(!transfers[i].pn.isRoutable()) 
continue;
                                        noneRouteable = false;
                                        if(!transfers[i].completedTransfer) {
+                                               //must wait
                                                completedTransfers = false;
                                                break;
                                        }
+                                       if 
(!transfers[i].receivedCompletionNotice) {
+                                               //must wait
+                                               completedNotifications = false;
+                                               break;
+                                       }
+                                       if (!transfers[i].completionSucceeded)
+                                               return false;
                                }
-                               if(completedTransfers) return true;
                                if(noneRouteable) return false;
+                               if(completedTransfers && 
completedNotifications) return true;

-                               synchronized(backgroundTransfers) {
-                                       if(receiveFailed) return false;
-                                       if(logMINOR) Logger.minor(this, 
"Waiting for completion");
+                                       if(logMINOR) Logger.minor(this, 
"Waiting for (completion="+!completedTransfers+", 
notification="+completedNotifications+")");
                                        try {
                                                
backgroundTransfers.wait(100*1000);
                                        } catch (InterruptedException e) {


Reply via email to