Author: robert
Date: 2008-01-24 16:43:20 +0000 (Thu, 24 Jan 2008)
New Revision: 17233
Modified:
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
Log:
collect leftover messages for up to 5 seconds, and reuse message filters for
waitFor(): less memory churn
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2008-01-24
16:15:19 UTC (rev 17232)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2008-01-24
16:43:20 UTC (rev 17233)
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.LinkedList;
+import freenet.io.comm.AsyncMessageFilterCallback;
import freenet.io.comm.ByteCounter;
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
@@ -37,13 +38,14 @@
/**
* @author ian
*/
-public class BlockReceiver {
+public class BlockReceiver implements AsyncMessageFilterCallback {
public static final int RECEIPT_TIMEOUT = 30000;
// TODO: This should be proportional to the calculated round-trip-time,
not a constant
public static final int MAX_ROUND_TRIP_TIME = RECEIPT_TIMEOUT;
public static final int MAX_CONSECUTIVE_MISSING_PACKET_REPORTS = 4;
public static final int MAX_SEND_INTERVAL = 500;
+ public static final int CLEANUP_TIMEOUT = 5000;
PartiallyReceivedBlock _prb;
PeerContext _sender;
long _uid;
@@ -52,7 +54,11 @@
HashMap _recentlyReportedMissingPackets = new HashMap();
ByteCounter _ctr;
boolean sentAborted;
+ private MessageFilter discardFilter;
+ private long discardEndTime;
+ boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
+
public BlockReceiver(MessageCore usm, PeerContext sender, long uid,
PartiallyReceivedBlock prb, ByteCounter ctr) {
_sender = sender;
_prb = prb;
@@ -68,15 +74,15 @@
public byte[] receive() throws RetrievalException {
int consecutiveMissingPacketReports = 0;
- boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
try {
+ MessageFilter mfPacketTransmit =
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
_uid).setSource(_sender);
+ MessageFilter mfAllSent =
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID,
_uid).setSource(_sender);
+ MessageFilter mfSendAborted =
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID,
_uid).setSource(_sender);
+ MessageFilter
relevantMessages=mfPacketTransmit.or(mfAllSent.or(mfSendAborted));
while (!_prb.allReceived()) {
Message m1;
try {
- MessageFilter mfPacketTransmit =
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
_uid).setSource(_sender);
- MessageFilter mfAllSent =
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID,
_uid).setSource(_sender);
- MessageFilter mfSendAborted =
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID,
_uid).setSource(_sender);
- m1 =
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)), _ctr);
+ m1 = _usm.waitFor(relevantMessages, _ctr);
if(!_sender.isConnected()) throw new DisconnectedException();
} catch (DisconnectedException e1) {
Logger.normal(this, "Disconnected during receive: "+_uid+"
from "+_sender);
@@ -151,6 +157,9 @@
}
}
_usm.send(_sender, DMT.createAllReceived(_uid), _ctr);
+ discardEndTime=System.currentTimeMillis()+CLEANUP_TIMEOUT;
+ discardFilter=relevantMessages;
+ maybeResetDiscardFilter();
return _prb.getBlock();
} catch(NotConnectedException e) {
throw new
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
@@ -168,4 +177,36 @@
}
}
}
+
+ private void maybeResetDiscardFilter() {
+ long timeleft=discardEndTime-System.currentTimeMillis();
+ if (timeleft>0) {
+ try {
+ discardFilter.setTimeout((int)timeleft);
+ _usm.addAsyncFilter(discardFilter, this);
+ } catch (DisconnectedException e) {
+ //ignore
+ }
+ }
+ }
+
+ /**
+ * Used to discard leftover messages, usually just packetTransmit and
allSent.
+ * allSent, is quite common, as the receive() routine usually quits
immeadiately on receiving all packets.
+ * packetTransmit is less common, when receive() requested what it
thought was a missing packet, only reordered.
+ */
+ public void onMatched(Message m) {
+ if (logMINOR)
+ Logger.minor(this, "discarding message post-receive:
"+m);
+ maybeResetDiscardFilter();
+ }
+
+ public boolean shouldTimeout() {
+ return false;
+ }
+
+ public void onTimeout() {
+ //ignore
+ }
+
}