You need to call .setMatchesDroppedConnection()
and .setMatchesRestartedConnections() on the filter.
On Wednesday 19 December 2007 19:31, robert at freenetproject.org wrote:
> Author: robert
> Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
> New Revision: 16733
>
> Modified:
> trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> use message callback rather than hanging onto thread
>
>
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
> 19:10:21
UTC (rev 16732)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
> 19:31:05
UTC (rev 16733)
> @@ -6,6 +6,7 @@
> import java.util.HashSet;
> import java.util.Vector;
>
> +import freenet.io.comm.AsyncMessageFilterCallback;
> import freenet.io.comm.ByteCounter;
> import freenet.io.comm.DMT;
> import freenet.io.comm.DisconnectedException;
> @@ -24,7 +25,7 @@
>
> public final class CHKInsertSender implements Runnable, AnyInsertSender,
ByteCounter {
>
> - private class BackgroundTransfer implements Runnable {
> + private class BackgroundTransfer implements Runnable,
AsyncMessageFilterCallback {
> /** Node we are waiting for response from */
> final PeerNode pn;
> /** We may be sending data to that node */
> @@ -42,6 +43,8 @@
> /** Did it succeed? */
> boolean transferSucceeded;
>
> + long transferCompletedTime;
> +
> BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
> this.pn = pn;
> bt = new BlockTransmitter(node.usm, pn, uid, prb,
> node.outputThrottle,
CHKInsertSender.this);
> @@ -56,7 +59,13 @@
> try {
> bt.send(node.executor);
>
> this.completedTransfer(bt.failedDueToOverload());
> -
> this.receivedNotice(waitForReceivedNotification(this));
> + if (pn.isRoutable() && transferSucceeded) {
> + //synch-version:
this.receivedNotice(waitForReceivedNotification(this));
> + //Add ourselves as a listener for the
> longterm completion message of
this transfer, then gracefully exit.
> +
> node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
> + } else {
> + this.receivedNotice(false);
> + }
> } catch (Throwable t) {
> this.completedTransfer(false);
> this.receivedNotice(false);
> @@ -64,10 +73,11 @@
> }
> }
>
> - void completedTransfer(boolean success) {
> + private void completedTransfer(boolean success) {
> synchronized(this) {
> transferSucceeded = success;
> completedTransfer = true;
> + transferCompletedTime =
> System.currentTimeMillis();
> notifyAll();
> }
> synchronized(backgroundTransfers) {
> @@ -78,11 +88,15 @@
> }
> }
>
> - void receivedNotice(boolean success) {
> + private void receivedNotice(boolean success) {
> synchronized(this) {
> + if (receivedCompletionNotice) {
> + Logger.error(this,
> "receivedNotice("+success+"), already had
receivedNotice("+completionSucceeded+")");
> + } else {
> completionSucceeded = success;
> receivedCompletionNotice = true;
> notifyAll();
> + }
> }
> synchronized(backgroundTransfers) {
> backgroundTransfers.notifyAll();
> @@ -92,6 +106,39 @@
> }
> }
>
> + public void onMatched(Message m) {
> + PeerNode pn = (PeerNode) m.getSource();
> + // pn cannot be null, because the filters will prevent
> garbage
collection of the nodes
> +
> + if(this.pn.equals(pn)) {
> + boolean anyTimedOut =
> m.getBoolean(DMT.ANY_TIMED_OUT);
> + if(anyTimedOut) {
> +
> CHKInsertSender.this.setTransferTimedOut();
> + }
> + receivedNotice(!anyTimedOut);
> + } else {
> + Logger.error(this, "received completion notice
> for wrong
node: "+pn+" != "+this.pn);
> + }
> + }
> +
> + public boolean shouldTimeout() {
> + //AFIACS, this will still let the filter timeout, but
> not call
onMatched() twice.
> + return receivedCompletionNotice;
> + }
> +
> + private MessageFilter getNotificationMessageFilter() {
> + return MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
> + }
> +
> + boolean isTimedOut() {
> + return
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
> + }
> +
> + public void maybeTimedOut() {
> + if (isTimedOut()) {
> + receivedNotice(false);
> + }
> + }
> }
>
> CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
> @@ -651,83 +698,7 @@
> }
> }
> }
> -
> - /**
> - * Blocks and waits for a response from the given node asto the final
transfer status in the chain. This will be longer/after
> - * the local block transfer is complete, as it is neccesary to include
> the
rount-trip-time in the allTransfersComplete()
> - * function.
> - * Returns true if received a successful notification of the downstream
reception, false in every other case
> - * (e.g. timeout, cancel, receiveFailed, etc).
> - */
> - private boolean waitForReceivedNotification(BackgroundTransfer awc) {
> -
> - long transfersCompletedTime =
> System.currentTimeMillis();
> -
> - // Wait for acknowledgements from each node, or
> timeouts.
> -
> - while(true) {
> -
> - synchronized(backgroundTransfers) {
> - if(receiveFailed) return false;
> - }
> - // First calculate the timeout
> - int timeout;
> - long now = System.currentTimeMillis();
> - timeout = (int)Math.min(Integer.MAX_VALUE,
> (transfersCompletedTime +
TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
> - if(timeout <= 0) {
> - Logger.error(this, "Timed out
> waiting for transfers to complete
on "+uid);
> - setTransferTimedOut();
> - return false;
> - }
> -
> - // If disconnected, ignore.
> - if(!awc.pn.isRoutable()) {
> - Logger.normal(this,
> "Disconnected: "+awc.pn+"
in "+CHKInsertSender.this);
> - return false;
> - }
> - // If transfer failed, probably won't
> be acknowledged.
> - if(!awc.transferSucceeded) {
> - if (logMINOR)
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded ->
false");
> - return false;
> - }
> - // See if redundant.
> - if(awc.receivedCompletionNotice) {
> - return awc.completionSucceeded;
> - }
> -
> - MessageFilter mf =
> -
> MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
>
> - if(logMINOR) Logger.minor(this,
> "Waiting for "+awc.pn.getPeer());
> -
> - Message m;
> - try {
> - m = node.usm.waitFor(mf,
> CHKInsertSender.this);
> - } catch (DisconnectedException e) {
> - Logger.normal(this,
> "Disconnected (on waitFor): "+awc.pn+"
in "+this);
> - return false;
> - }
> - if(m == null) {
> - Logger.error(this, "Timed out
> waiting for a final ack
from: "+awc.pn);
> - return false;
> - } else {
> - PeerNode pn = (PeerNode)
> m.getSource();
> - // pn cannot be null, because
> the filters will prevent garbage
collection of the nodes
> -
> - if(awc.pn.equals(pn)) {
> - boolean
> anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> - if(anyTimedOut)
> {
> -
> setTransferTimedOut();
> - }
> - return
> !anyTimedOut;
> - } else {
> -
> Logger.error(this, "received completion notice for wrong
node: "+awc);
> - continue;
> - }
> - }
> - }
> - }
> -
> /**
> * Block until all transfers have reached a final-terminal
> state
(success/failure). On success this means that a
> * successful 'received-notification' has been received.
> @@ -751,6 +722,7 @@
> completedTransfers = false;
> break;
> }
> + transfers[i].maybeTimedOut();
> if
> (!transfers[i].receivedCompletionNotice) {
> //must wait
> completedNotifications = false;
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20071220/1158e1e3/attachment.pgp>