Author: toad Date: 2007-04-14 20:25:54 +0000 (Sat, 14 Apr 2007) New Revision: 12720
Modified:
trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
Refactor into something comprehensible.
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-04-14 20:07:48 UTC
(rev 12719)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-04-14 20:25:54 UTC
(rev 12720)
@@ -176,10 +176,6 @@
/** Runnable which waits for completion of all transfers */
private CompletionWaiter cw;
- /** Time when all transfers were completed */
- private long transfersCompletedTime = -1;
-
-
private int status = -1;
/** Still running */
static final int NOT_FINISHED = -1;
@@ -642,43 +638,58 @@
public void run() {
if(logMINOR) Logger.minor(this, "Starting "+this);
+ // Wait for the request to reach a terminal stage.
waitForStatus();
- while(true) {
- AwaitingCompletion[] waiters;
- synchronized(nodesWaitingForCompletion) {
- waiters = new
AwaitingCompletion[nodesWaitingForCompletion.size()];
- waiters = (AwaitingCompletion[])
nodesWaitingForCompletion.toArray(waiters);
+ AwaitingCompletion[] waiters;
+ synchronized(nodesWaitingForCompletion) {
+ waiters = new
AwaitingCompletion[nodesWaitingForCompletion.size()];
+ waiters = (AwaitingCompletion[])
nodesWaitingForCompletion.toArray(waiters);
+ }
+
+ // Wait for the outgoing transfers to complete.
+ if(!waitForCompletedTransfers(waiters)) {
+ synchronized(CHKInsertSender.this) {
+ allTransfersCompleted = true;
+ transferTimedOut = true; // probably,
they disconnected
+ CHKInsertSender.this.notifyAll();
+ return;
}
- // Because we have completed, no more waiters
will be added.
+ }
+
+ long transfersCompletedTime =
System.currentTimeMillis();
+
+ // Wait for acknowledgements from each node, or
timeouts.
+
+ while(true) {
// First calculate the timeout
-
int timeout;
- boolean noTimeLeft = false;
-
long now = System.currentTimeMillis();
-
timeout = (int)Math.min(Integer.MAX_VALUE,
(transfersCompletedTime + TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
if(timeout <= 0) {
- noTimeLeft = true;
- timeout = 1;
+ synchronized(CHKInsertSender.this) {
+ if(logMINOR) Logger.minor(this,
"Timed out waiting for transfers to complete on "+uid);
+ transferTimedOut = true;
+ allTransfersCompleted = true;
+
CHKInsertSender.this.notifyAll();
+ }
+ return;
}
MessageFilter mf = null;
- boolean waitingForAny = false;
- boolean anyNotCompleted = false;
for(int i=0;i<waiters.length;i++) {
AwaitingCompletion awc = waiters[i];
+ // If disconnected, ignore.
if(!awc.pn.isRoutable()) {
Logger.normal(this,
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
continue;
}
- waitingForAny = true;
- if(!awc.completedTransfer) {
- anyNotCompleted = true;
+ // If transfer failed, probably won't
be acknowledged.
+ if(!awc.transferSucceeded) {
continue;
}
+ // Wait for completion.
if(!awc.receivedCompletionNotice) {
MessageFilter m =
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
@@ -691,44 +702,18 @@
}
if(mf == null) {
-
- if(!waitingForAny) {
- // All are disconnected
-
synchronized(CHKInsertSender.this) {
- allTransfersCompleted =
true;
-
CHKInsertSender.this.notifyAll();
- }
+ synchronized(CHKInsertSender.this) {
+ allTransfersCompleted = true;
+
CHKInsertSender.this.notifyAll();
return;
}
-
- if(!anyNotCompleted) {
- // All have completed
transferring, AND all have received completion notices!
- // All done!
- if(logMINOR) Logger.minor(this,
"Completed, status="+getStatusString()+", nothing left to wait for for "+uid+"
.");
-
synchronized(CHKInsertSender.this) {
- allTransfersCompleted =
true;
-
CHKInsertSender.this.notifyAll();
- }
- return;
- }
-
- if(waitForCompletedTransfers(waiters,
timeout, noTimeLeft)) {
-
synchronized(CHKInsertSender.this) {
- if(logMINOR)
Logger.minor(this, "All transfers completed (1) on "+uid);
- allTransfersCompleted =
true;
- transfersCompletedTime
= System.currentTimeMillis();
-
CHKInsertSender.this.notifyAll();
- // Now wait for the
acknowledgements
- }
- }
- continue;
} else {
Message m;
try {
m = node.usm.waitFor(mf,
CHKInsertSender.this);
} catch (DisconnectedException e) {
// Which one? I have no idea.
- // Go around the loop again.
+ // Go around the loop again to
find out.
continue;
}
if(m != null) {
@@ -755,68 +740,37 @@
if(!processed) {
Logger.error(this, "Did
not process message: "+m+" on "+this);
}
- } else {
-
if(nodesWaitingForCompletion.size() > waiters.length) {
- // Added another one
- if(logMINOR)
Logger.minor(this, "Looping: waiters="+waiters.length+" but
waiting="+nodesWaitingForCompletion.size());
- continue;
- }
- if(noTimeLeft) {
- if(logMINOR)
Logger.minor(this, "Overall timeout on "+CHKInsertSender.this);
- for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isRoutable()) continue;
-
if(!waiters[i].receivedCompletionNotice)
-
waiters[i].completed(false, false);
-
if(!waiters[i].completedTransfer)
-
waiters[i].completedTransfer(false);
- }
-
synchronized(CHKInsertSender.this) {
- if(logMINOR)
Logger.minor(this, "All transfers completed (2) on "+uid);
-
transferTimedOut = true;
-
allTransfersCompleted = true;
-
CHKInsertSender.this.notifyAll();
- }
- return;
- }
}
}
}
}
- /** @return True if all transfers have completed, false
otherwise. */
- private boolean waitForCompletedTransfers(AwaitingCompletion[]
waiters, int timeout, boolean noTimeLeft) {
+ /** Block until all transfers have finished. @return True if
there is any point in waiting for acknowledgements. */
+ private boolean waitForCompletedTransfers(AwaitingCompletion[]
waiters) {
// MAYBE all done
- boolean completedTransfers = true;
- synchronized(nodesWaitingForCompletion) {
+ while(true) {
+ boolean noneRouteable = true;
+ boolean completedTransfers = true;
for(int i=0;i<waiters.length;i++) {
if(!waiters[i].pn.isRoutable())
continue;
+ noneRouteable = false;
if(!waiters[i].completedTransfer) {
completedTransfers = false;
break;
}
}
- if(!completedTransfers) {
+ if(completedTransfers) return true;
+ if(noneRouteable) return false;
+
+ synchronized(nodesWaitingForCompletion) {
+ if(logMINOR) Logger.minor(this,
"Waiting for completion");
try {
- if(!noTimeLeft) {
- if(logMINOR)
Logger.minor(this, "Waiting for completion ("+timeout+"ms)");
-
nodesWaitingForCompletion.wait(timeout);
- } else {
- // Timed out
- }
- completedTransfers = true;
- for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isRoutable()) continue;
-
if(!waiters[i].completedTransfer) {
-
completedTransfers = false;
- break;
- }
- }
+
nodesWaitingForCompletion.wait(100*1000);
} catch (InterruptedException e) {
// Ignore
}
- }
+ }
}
- return completedTransfers;
}
public String toString() {
