Author: toad
Date: 2007-04-14 20:25:54 +0000 (Sat, 14 Apr 2007)
New Revision: 12720

Modified:
   trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
Refactor into something comprehensible.

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-04-14 20:07:48 UTC 
(rev 12719)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-04-14 20:25:54 UTC 
(rev 12720)
@@ -176,10 +176,6 @@
     /** Runnable which waits for completion of all transfers */
     private CompletionWaiter cw;

-    /** Time when all transfers were completed */
-    private long transfersCompletedTime = -1;
-    
-    
     private int status = -1;
     /** Still running */
     static final int NOT_FINISHED = -1;
@@ -642,43 +638,58 @@
                public void run() {
                        if(logMINOR) Logger.minor(this, "Starting "+this);

+                       // Wait for the request to reach a terminal stage.
                        waitForStatus();

-                       while(true) {
-                               AwaitingCompletion[] waiters;
-                               synchronized(nodesWaitingForCompletion) {
-                                       waiters = new 
AwaitingCompletion[nodesWaitingForCompletion.size()];
-                                       waiters = (AwaitingCompletion[]) 
nodesWaitingForCompletion.toArray(waiters);
+                       AwaitingCompletion[] waiters;
+                       synchronized(nodesWaitingForCompletion) {
+                               waiters = new 
AwaitingCompletion[nodesWaitingForCompletion.size()];
+                               waiters = (AwaitingCompletion[]) 
nodesWaitingForCompletion.toArray(waiters);
+                       }
+                       
+                       // Wait for the outgoing transfers to complete.
+                       if(!waitForCompletedTransfers(waiters)) {
+                               synchronized(CHKInsertSender.this) {
+                                       allTransfersCompleted = true;
+                                       transferTimedOut = true; // probably, 
they disconnected
+                                       CHKInsertSender.this.notifyAll();
+                                       return;
                                }
-                               // Because we have completed, no more waiters 
will be added.
+                       }
+                       
+                       long transfersCompletedTime = 
System.currentTimeMillis();
+                       
+                       // Wait for acknowledgements from each node, or 
timeouts.
+                       
+                       while(true) {

                                // First calculate the timeout
-                               
                                int timeout;
-                               boolean noTimeLeft = false;
-                               
                                long now = System.currentTimeMillis();
-                               
                                timeout = (int)Math.min(Integer.MAX_VALUE, 
(transfersCompletedTime + TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
                                if(timeout <= 0) {
-                                       noTimeLeft = true;
-                                       timeout = 1;
+                                       synchronized(CHKInsertSender.this) {
+                                               if(logMINOR) Logger.minor(this, 
"Timed out waiting for transfers to complete on "+uid);
+                                               transferTimedOut = true;
+                                               allTransfersCompleted = true;
+                                               
CHKInsertSender.this.notifyAll();
+                                       }
+                                       return;
                                }

                                MessageFilter mf = null;
-                               boolean waitingForAny = false;
-                               boolean anyNotCompleted = false;
                                for(int i=0;i<waiters.length;i++) {
                                        AwaitingCompletion awc = waiters[i];
+                                       // If disconnected, ignore.
                                        if(!awc.pn.isRoutable()) {
                                                Logger.normal(this, 
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
                                                continue;
                                        }
-                                       waitingForAny = true;
-                                       if(!awc.completedTransfer) {
-                                               anyNotCompleted = true;
+                                       // If transfer failed, probably won't 
be acknowledged.
+                                       if(!awc.transferSucceeded) {
                                                continue;
                                        }
+                                       // Wait for completion.
                                        if(!awc.receivedCompletionNotice) {
                                                MessageFilter m =
                                                        
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
@@ -691,44 +702,18 @@
                                }

                                if(mf == null) {
-                                       
-                                       if(!waitingForAny) {
-                                               // All are disconnected
-                                               
synchronized(CHKInsertSender.this) {
-                                                       allTransfersCompleted = 
true;
-                                                       
CHKInsertSender.this.notifyAll();
-                                               }
+                                       synchronized(CHKInsertSender.this) {
+                                               allTransfersCompleted = true;
+                                               
CHKInsertSender.this.notifyAll();
                                                return;
                                        }
-                                       
-                                       if(!anyNotCompleted) {
-                                               // All have completed 
transferring, AND all have received completion notices!
-                                               // All done!
-                                               if(logMINOR) Logger.minor(this, 
"Completed, status="+getStatusString()+", nothing left to wait for for "+uid+" 
.");
-                                               
synchronized(CHKInsertSender.this) {
-                                                       allTransfersCompleted = 
true;
-                                                       
CHKInsertSender.this.notifyAll();
-                                               }
-                                               return;
-                                       }
-                                       
-                                       if(waitForCompletedTransfers(waiters, 
timeout, noTimeLeft)) {
-                                               
synchronized(CHKInsertSender.this) {
-                                                       if(logMINOR) 
Logger.minor(this, "All transfers completed (1) on "+uid);
-                                                       allTransfersCompleted = 
true;
-                                                       transfersCompletedTime 
= System.currentTimeMillis();
-                                                       
CHKInsertSender.this.notifyAll();
-                                                       // Now wait for the 
acknowledgements
-                                               }
-                                       }
-                                       continue;
                                } else {
                                        Message m;
                                        try {
                                                m = node.usm.waitFor(mf, 
CHKInsertSender.this);
                                        } catch (DisconnectedException e) {
                                                // Which one? I have no idea.
-                                               // Go around the loop again.
+                                               // Go around the loop again to 
find out.
                                                continue;
                                        }
                                        if(m != null) {
@@ -755,68 +740,37 @@
                                                if(!processed) {
                                                        Logger.error(this, "Did 
not process message: "+m+" on "+this);
                                                }
-                                       } else {
-                                               
if(nodesWaitingForCompletion.size() > waiters.length) {
-                                                       // Added another one
-                                                       if(logMINOR) 
Logger.minor(this, "Looping: waiters="+waiters.length+" but 
waiting="+nodesWaitingForCompletion.size());
-                                                       continue;
-                                               }
-                                               if(noTimeLeft) {
-                                                       if(logMINOR) 
Logger.minor(this, "Overall timeout on "+CHKInsertSender.this);
-                                                       for(int 
i=0;i<waiters.length;i++) {
-                                                               
if(!waiters[i].pn.isRoutable()) continue;
-                                                               
if(!waiters[i].receivedCompletionNotice)
-                                                                       
waiters[i].completed(false, false);
-                                                               
if(!waiters[i].completedTransfer)
-                                                                       
waiters[i].completedTransfer(false);
-                                                       }
-                                                       
synchronized(CHKInsertSender.this) {
-                                                               if(logMINOR) 
Logger.minor(this, "All transfers completed (2) on "+uid);
-                                                               
transferTimedOut = true;
-                                                               
allTransfersCompleted = true;
-                                                               
CHKInsertSender.this.notifyAll();
-                                                       }
-                                                       return;
-                                               }
                                        }
                                }
                        }
                }

-               /** @return True if all transfers have completed, false 
otherwise. */
-               private boolean waitForCompletedTransfers(AwaitingCompletion[] 
waiters, int timeout, boolean noTimeLeft) {
+               /** Block until all transfers have finished. @return True if 
there is any point in waiting for acknowledgements. */
+               private boolean waitForCompletedTransfers(AwaitingCompletion[] 
waiters) {
                        // MAYBE all done
-                       boolean completedTransfers = true;
-                       synchronized(nodesWaitingForCompletion) {
+                       while(true) {
+                               boolean noneRouteable = true;
+                               boolean completedTransfers = true;
                                for(int i=0;i<waiters.length;i++) {
                                        if(!waiters[i].pn.isRoutable()) 
continue;
+                                       noneRouteable = false;
                                        if(!waiters[i].completedTransfer) {
                                                completedTransfers = false;
                                                break;
                                        }
                                }
-                               if(!completedTransfers) {
+                               if(completedTransfers) return true;
+                               if(noneRouteable) return false;
+
+                               synchronized(nodesWaitingForCompletion) {
+                                       if(logMINOR) Logger.minor(this, 
"Waiting for completion");
                                        try {
-                                               if(!noTimeLeft) {
-                                                       if(logMINOR) 
Logger.minor(this, "Waiting for completion ("+timeout+"ms)");
-                                                       
nodesWaitingForCompletion.wait(timeout);
-                                               } else {
-                                                       // Timed out
-                                               }
-                                               completedTransfers = true;
-                                               for(int 
i=0;i<waiters.length;i++) {
-                                                       
if(!waiters[i].pn.isRoutable()) continue;
-                                                       
if(!waiters[i].completedTransfer) {
-                                                               
completedTransfers = false;
-                                                               break;
-                                                       }
-                                               }
+                                               
nodesWaitingForCompletion.wait(100*1000);
                                        } catch (InterruptedException e) {
                                                // Ignore
                                        }
-                               }
+                               }                               
                        }
-                       return completedTransfers;
                }

                public String toString() {


Reply via email to