Author: robert
Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
New Revision: 16733
Modified:
trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
use message callback rather than hanging onto thread
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 19:10:21 UTC
(rev 16732)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19 19:31:05 UTC
(rev 16733)
@@ -6,6 +6,7 @@
import java.util.HashSet;
import java.util.Vector;
+import freenet.io.comm.AsyncMessageFilterCallback;
import freenet.io.comm.ByteCounter;
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
@@ -24,7 +25,7 @@
public final class CHKInsertSender implements Runnable, AnyInsertSender,
ByteCounter {
- private class BackgroundTransfer implements Runnable {
+ private class BackgroundTransfer implements Runnable,
AsyncMessageFilterCallback {
/** Node we are waiting for response from */
final PeerNode pn;
/** We may be sending data to that node */
@@ -42,6 +43,8 @@
/** Did it succeed? */
boolean transferSucceeded;
+ long transferCompletedTime;
+
BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
this.pn = pn;
bt = new BlockTransmitter(node.usm, pn, uid, prb,
node.outputThrottle, CHKInsertSender.this);
@@ -56,7 +59,13 @@
try {
bt.send(node.executor);
this.completedTransfer(bt.failedDueToOverload());
-
this.receivedNotice(waitForReceivedNotification(this));
+ if (pn.isRoutable() && transferSucceeded) {
+ //synch-version:
this.receivedNotice(waitForReceivedNotification(this));
+ //Add ourselves as a listener for the
longterm completion message of this transfer, then gracefully exit.
+
node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
+ } else {
+ this.receivedNotice(false);
+ }
} catch (Throwable t) {
this.completedTransfer(false);
this.receivedNotice(false);
@@ -64,10 +73,11 @@
}
}
- void completedTransfer(boolean success) {
+ private void completedTransfer(boolean success) {
synchronized(this) {
transferSucceeded = success;
completedTransfer = true;
+ transferCompletedTime =
System.currentTimeMillis();
notifyAll();
}
synchronized(backgroundTransfers) {
@@ -78,11 +88,15 @@
}
}
- void receivedNotice(boolean success) {
+ private void receivedNotice(boolean success) {
synchronized(this) {
+ if (receivedCompletionNotice) {
+ Logger.error(this,
"receivedNotice("+success+"), already had
receivedNotice("+completionSucceeded+")");
+ } else {
completionSucceeded = success;
receivedCompletionNotice = true;
notifyAll();
+ }
}
synchronized(backgroundTransfers) {
backgroundTransfers.notifyAll();
@@ -92,6 +106,39 @@
}
}
+ public void onMatched(Message m) {
+ PeerNode pn = (PeerNode) m.getSource();
+ // pn cannot be null, because the filters will prevent
garbage collection of the nodes
+
+ if(this.pn.equals(pn)) {
+ boolean anyTimedOut =
m.getBoolean(DMT.ANY_TIMED_OUT);
+ if(anyTimedOut) {
+
CHKInsertSender.this.setTransferTimedOut();
+ }
+ receivedNotice(!anyTimedOut);
+ } else {
+ Logger.error(this, "received completion notice
for wrong node: "+pn+" != "+this.pn);
+ }
+ }
+
+ public boolean shouldTimeout() {
+ //AFIACS, this will still let the filter timeout, but
not call onMatched() twice.
+ return receivedCompletionNotice;
+ }
+
+ private MessageFilter getNotificationMessageFilter() {
+ return MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
+ }
+
+ boolean isTimedOut() {
+ return
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
+ }
+
+ public void maybeTimedOut() {
+ if (isTimedOut()) {
+ receivedNotice(false);
+ }
+ }
}
CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
@@ -651,83 +698,7 @@
}
}
}
-
- /**
- * Blocks and waits for a response from the given node asto the final
transfer status in the chain. This will be longer/after
- * the local block transfer is complete, as it is neccesary to include
the rount-trip-time in the allTransfersComplete()
- * function.
- * Returns true if received a successful notification of the downstream
reception, false in every other case
- * (e.g. timeout, cancel, receiveFailed, etc).
- */
- private boolean waitForReceivedNotification(BackgroundTransfer awc) {
-
- long transfersCompletedTime =
System.currentTimeMillis();
-
- // Wait for acknowledgements from each node, or
timeouts.
-
- while(true) {
-
- synchronized(backgroundTransfers) {
- if(receiveFailed) return false;
- }
- // First calculate the timeout
- int timeout;
- long now = System.currentTimeMillis();
- timeout = (int)Math.min(Integer.MAX_VALUE,
(transfersCompletedTime + TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
- if(timeout <= 0) {
- Logger.error(this, "Timed out
waiting for transfers to complete on "+uid);
- setTransferTimedOut();
- return false;
- }
-
- // If disconnected, ignore.
- if(!awc.pn.isRoutable()) {
- Logger.normal(this,
"Disconnected: "+awc.pn+" in "+CHKInsertSender.this);
- return false;
- }
- // If transfer failed, probably won't
be acknowledged.
- if(!awc.transferSucceeded) {
- if (logMINOR)
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> false");
- return false;
- }
- // See if redundant.
- if(awc.receivedCompletionNotice) {
- return awc.completionSucceeded;
- }
-
- MessageFilter mf =
-
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
- if(logMINOR) Logger.minor(this,
"Waiting for "+awc.pn.getPeer());
-
- Message m;
- try {
- m = node.usm.waitFor(mf,
CHKInsertSender.this);
- } catch (DisconnectedException e) {
- Logger.normal(this,
"Disconnected (on waitFor): "+awc.pn+" in "+this);
- return false;
- }
- if(m == null) {
- Logger.error(this, "Timed out
waiting for a final ack from: "+awc.pn);
- return false;
- } else {
- PeerNode pn = (PeerNode)
m.getSource();
- // pn cannot be null, because
the filters will prevent garbage collection of the nodes
-
- if(awc.pn.equals(pn)) {
- boolean
anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
- if(anyTimedOut)
{
-
setTransferTimedOut();
- }
- return
!anyTimedOut;
- } else {
-
Logger.error(this, "received completion notice for wrong node: "+awc);
- continue;
- }
- }
- }
- }
-
/**
* Block until all transfers have reached a final-terminal
state (success/failure). On success this means that a
* successful 'received-notification' has been received.
@@ -751,6 +722,7 @@
completedTransfers = false;
break;
}
+ transfers[i].maybeTimedOut();
if
(!transfers[i].receivedCompletionNotice) {
//must wait
completedNotifications = false;