Author: toad
Date: 2007-05-31 22:54:42 +0000 (Thu, 31 May 2007)
New Revision: 13426
Modified:
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
Log:
Finish async API, use it to wait for cancellation in BulkTransmitter.
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2007-05-31
22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2007-05-31
22:54:42 UTC (rev 13426)
@@ -471,6 +471,74 @@
}
}
+
+ public void addAsyncFilter(MessageFilter filter,
AsyncMessageFilterCallback callback) throws DisconnectedException {
+ filter.setAsyncCallback(callback);
+ boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+"
for "+callback);
+ Message ret = null;
+ if((lowLevelFilter != null) && (filter._source != null) &&
+ filter.matchesDroppedConnection() &&
+ lowLevelFilter.isDisconnected(filter._source))
+ throw new DisconnectedException();
+ // Check to see whether the filter matches any of the recently
_unclaimed messages
+ // Drop any _unclaimed messages that the filter doesn't match
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+ long now = System.currentTimeMillis();
+ long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+ long messageLifeTime = 0;
+ synchronized (_filters) {
+ if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+ for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
+ Message m = (Message) i.next();
+ if (filter.match(m)) {
+ i.remove();
+ ret = m;
+ if(logMINOR) Logger.debug(this,
"Matching from _unclaimed");
+ break;
+ } else if (m.localInstantiationTime <
messageDropTime) {
+ i.remove();
+ messageLifeTime = now -
m.localInstantiationTime;
+ if ((m.getSource()) instanceof
PeerNode) {
+ Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+ } else {
+ Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+":
"+m);
+ }
+ }
+ }
+ if (ret == null) {
+ if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
+ // Insert filter into filter list in order of
timeout
+ ListIterator i = _filters.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added at end");
+ break;
+ }
+ MessageFilter mf = (MessageFilter)
i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
+ }
+ }
+ }
+ if(ret != null) {
+ filter.onMatched();
+ filter.clearMatched();
+ }
+ }
+
+ /**
+ * Wait for a filter to trigger, or timeout. Blocks until either the
trigger is activated, or it times
+ * out, or the peer is disconnected.
+ * @param filter The filter to wait for.
+ * @param ctr Byte counter to add bytes from the message to.
+ * @return Either a message, or null if the filter timed out.
+ * @throws DisconnectedException If the single peer being waited for
disconnects.
+ */
public Message waitFor(MessageFilter filter, ByteCounter ctr) throws
DisconnectedException {
boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
@@ -556,6 +624,10 @@
filter.clearMatched();
}
if(logDEBUG) Logger.debug(this, "Returning "+ret+" from
"+filter);
+ } else {
+ // Matched an unclaimed packet
+ filter.onMatched();
+ filter.clearMatched();
}
// Probably get rid...
// if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-31
22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-31
22:54:42 UTC (rev 13426)
@@ -3,7 +3,11 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.io.xfer;
+import freenet.io.comm.AsyncMessageFilterCallback;
import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.support.BitArray;
@@ -31,9 +35,10 @@
final long peerBootID;
/** The overall hard bandwidth limiter */
final DoubleTokenBucket masterThrottle;
+ private boolean sentCancel;
+ private boolean finished;
-
- public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, DoubleTokenBucket masterThrottle) {
+ public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, DoubleTokenBucket masterThrottle) throws DisconnectedException {
this.prb = prb;
this.peer = peer;
this.uid = uid;
@@ -48,6 +53,24 @@
blocksNotSentButPresent = prb.cloneBlocksReceived();
prb.add(this);
}
+ try {
+
prb.usm.addAsyncFilter(MessageFilter.create().setNoTimeout().setSource(peer).setType(DMT.FNPBulkReceiveAborted).setField(DMT.UID,
uid),
+ new AsyncMessageFilterCallback() {
+ public void onMatched(Message
m) {
+ cancel();
+ }
+ public boolean shouldTimeout() {
+
synchronized(BulkTransmitter.this) {
+ if(cancelled ||
finished) return true;
+ }
+
if(BulkTransmitter.this.prb.isAborted()) return true;
+ return false;
+ }
+ });
+ } catch (DisconnectedException e) {
+ cancel();
+ throw e;
+ }
}
/**
@@ -64,22 +87,26 @@
* Called when the PRB is aborted.
*/
public void onAborted() {
- try {
- peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null,
0, null);
- } catch (NotConnectedException e) {
- // Cool
- }
+ sendAbortedMessage();
synchronized(this) {
notifyAll();
}
}
- public void cancel() {
+ private void sendAbortedMessage() {
+ synchronized(this) {
+ if(sentCancel) return;
+ sentCancel = true;
+ }
try {
peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null,
0, null);
} catch (NotConnectedException e) {
// Cool
}
+ }
+
+ public void cancel() {
+ sendAbortedMessage();
synchronized(this) {
cancelled = true;
notifyAll();
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-31 22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-31 22:54:42 UTC (rev 13426)
@@ -7,6 +7,7 @@
import freenet.io.comm.DMT;
import freenet.io.comm.RetrievalException;
+import freenet.io.comm.UdpSocketManager;
import freenet.node.FNPPacketMangler;
import freenet.support.BitArray;
import freenet.support.Logger;
@@ -30,6 +31,7 @@
private final BitArray blocksReceived;
final int blocks;
private BulkTransmitter[] transmitters;
+ final UdpSocketManager usm;
/** The one and only BulkReceiver */
private BulkReceiver recv;
private int blocksReceivedCount;
@@ -47,10 +49,11 @@
* @param initialState If true, assume all blocks have been received.
If false, assume no blocks have
* been received.
*/
- public PartiallyReceivedBulk(long size, int blockSize,
RandomAccessThing raf, boolean initialState) {
+ public PartiallyReceivedBulk(UdpSocketManager usm, long size, int
blockSize, RandomAccessThing raf, boolean initialState) {
this.size = size;
this.blockSize = blockSize;
this.raf = raf;
+ this.usm = usm;
long blocks = size / blockSize + (size % blockSize > 0 ? 1 : 0);
if(blocks > Integer.MAX_VALUE)
throw new IllegalArgumentException("Too big");