Author: robert
Date: 2007-12-19 17:27:36 +0000 (Wed, 19 Dec 2007)
New Revision: 16725
Modified:
trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
Wait for transfer ack's from background transfers independently (might catch
more)
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 16:44:52 UTC
(rev 16724)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 17:27:36 UTC
(rev 16725)
@@ -56,8 +56,10 @@
try {
bt.send(node.executor);
this.completedTransfer(bt.failedDueToOverload());
+
this.receivedNotice(waitForReceivedNotification(this));
} catch (Throwable t) {
this.completedTransfer(false);
+ this.receivedNotice(false);
Logger.error(this, "Caught "+t, t);
}
}
@@ -617,12 +619,12 @@
return sentRequest;
}
- public void waitForBackgroundTransferCompletions() {
+ private void waitForBackgroundTransferCompletions() {
try {
freenet.support.Logger.OSThread.logPID(this);
if(logMINOR) Logger.minor(this, "Starting "+this);
- // We are presently at a terminal stage.
+ // We must presently be at such a stage that no more
background transfers will be added.
BackgroundTransfer[] transfers;
synchronized(backgroundTransfers) {
@@ -631,10 +633,26 @@
}
// Wait for the outgoing transfers to complete.
- if(!waitForCompletedTransfers(transfers)) {
+ if(!waitForBackgroundTransfers(transfers)) {
setTransferTimedOut();
return;
}
+ } finally {
+ synchronized(CHKInsertSender.this) {
+ allTransfersCompleted = true;
+ CHKInsertSender.this.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Blocks and waits for a response from the given node asto the final
transfer status in the chain. This will be longer/after
+ * the local block transfer is complete, as it is neccesary to include
the rount-trip-time in the allTransfersComplete()
+ * function.
+ * Returns true if received a successful notification of the downstream
reception, false in every other case
+ * (e.g. timeout, cancel, receiveFailed, etc).
+ */
+ private boolean waitForReceivedNotification(BackgroundTransfer awc) {
long transfersCompletedTime =
System.currentTimeMillis();
@@ -643,7 +661,7 @@
while(true) {
synchronized(backgroundTransfers) {
- if(receiveFailed) return;
+ if(receiveFailed) return false;
}
// First calculate the timeout
int timeout;
@@ -652,104 +670,92 @@
if(timeout <= 0) {
Logger.error(this, "Timed out
waiting for transfers to complete on "+uid);
setTransferTimedOut();
- return;
+ return false;
}
- MessageFilter mf = null;
-
- //Build a message filter to capture
acknowledgement messages from the nodes we are interested in.
- for(int i=0;i<transfers.length;i++) {
- BackgroundTransfer awc = transfers[i];
// If disconnected, ignore.
if(!awc.pn.isRoutable()) {
Logger.normal(this,
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
- continue;
+ return false;
}
// If transfer failed, probably won't
be acknowledged.
if(!awc.transferSucceeded) {
- continue;
+ if (logMINOR)
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> false");
+ return false;
}
- // Wait for completion.
- if(!awc.receivedCompletionNotice) {
- MessageFilter m =
+ // See if redundant.
+ if(awc.receivedCompletionNotice) {
+ return awc.completionSucceeded;
+ }
+
+ MessageFilter mf =
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
- if(mf == null)
- mf = m;
- else
- mf = m.or(mf);
+
if(logMINOR) Logger.minor(this,
"Waiting for "+awc.pn.getPeer());
- }
- }
- if (mf==null) {
- if (logMINOR) Logger.minor(this, "Done
waiting, no more completion listeners");
- return;
- } else {
Message m;
try {
m = node.usm.waitFor(mf,
CHKInsertSender.this);
} catch (DisconnectedException e) {
- // Which one? I have no idea.
- // Go around the loop again to
find out.
- continue;
+ Logger.normal(this,
"Disconnected (on waitFor): "+awc.pn+" in "+this);
+ return false;
}
if(m == null) {
- Logger.error(this, "Timed out
waiting for a final ack from any nodes.");
- //Would looping again help? We
could either:
- // (1) loop and notice that
there is no more time left (handling the timeout), or
- // (2) notice that the nodes we
are waiting on are down and possibly exit gracefully (not implemented).
- continue;
+ Logger.error(this, "Timed out
waiting for a final ack from: "+awc.pn);
+ return false;
} else {
- // Process message
PeerNode pn = (PeerNode)
m.getSource();
// pn cannot be null, because
the filters will prevent garbage collection of the nodes
- boolean processed = false;
- for(int
i=0;i<transfers.length;i++) {
- PeerNode p =
transfers[i].pn;
- if(p == pn) {
+
+ if(awc.pn.equals(pn)) {
boolean
anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
-
transfers[i].receivedNotice(!anyTimedOut);
if(anyTimedOut)
{
setTransferTimedOut();
}
- processed =
true;
- break;
+ return
!anyTimedOut;
+ } else {
+
Logger.error(this, "received completion notice for wrong node: "+awc);
+ continue;
}
- }
- if(!processed) {
- Logger.error(this, "Did
not process message: "+m+" on "+this);
- }
}
}
- }
- } finally {
- synchronized(CHKInsertSender.this) {
- allTransfersCompleted = true;
- CHKInsertSender.this.notifyAll();
- }
- }
}
- /** Block until all transfers have finished. @return True if
there is any point in waiting for acknowledgements. */
- private boolean waitForCompletedTransfers(BackgroundTransfer[]
transfers) {
+ /**
+ * Block until all transfers have reached a final-terminal
state (success/failure). On success this means that a
+ * successful 'received-notification' has been received.
+ * @return True if all background transfers were successful.
+ */
+ private boolean waitForBackgroundTransfers(BackgroundTransfer[]
transfers) {
// MAYBE all done
while(true) {
+ //If we want to be sure to exit as-soon-as the
transfers are done, then we must hold the lock while we check.
+ synchronized(backgroundTransfers) {
+ if(receiveFailed) return false;
+
boolean noneRouteable = true;
boolean completedTransfers = true;
+ boolean completedNotifications = true;
for(int i=0;i<transfers.length;i++) {
if(!transfers[i].pn.isRoutable())
continue;
noneRouteable = false;
if(!transfers[i].completedTransfer) {
+ //must wait
completedTransfers = false;
break;
}
+ if
(!transfers[i].receivedCompletionNotice) {
+ //must wait
+ completedNotifications = false;
+ break;
+ }
+ if (!transfers[i].completionSucceeded)
+ return false;
}
- if(completedTransfers) return true;
if(noneRouteable) return false;
+ if(completedTransfers &&
completedNotifications) return true;
- synchronized(backgroundTransfers) {
- if(receiveFailed) return false;
- if(logMINOR) Logger.minor(this,
"Waiting for completion");
+ if(logMINOR) Logger.minor(this,
"Waiting for (completion="+!completedTransfers+",
notification="+completedNotifications+")");
try {
backgroundTransfers.wait(100*1000);
} catch (InterruptedException e) {