Author: robert
Date: 2008-01-24 16:43:20 +0000 (Thu, 24 Jan 2008)
New Revision: 17233

Modified:
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
Log:
collect leftover messages for up to 5 seconds, and reuse message filters for 
waitFor(): less memory churn


Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2008-01-24 
16:15:19 UTC (rev 17232)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2008-01-24 
16:43:20 UTC (rev 17233)
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.LinkedList;

+import freenet.io.comm.AsyncMessageFilterCallback;
 import freenet.io.comm.ByteCounter;
 import freenet.io.comm.DMT;
 import freenet.io.comm.DisconnectedException;
@@ -37,13 +38,14 @@
 /**
  * @author ian
  */
-public class BlockReceiver {
+public class BlockReceiver implements AsyncMessageFilterCallback {

        public static final int RECEIPT_TIMEOUT = 30000;
        // TODO: This should be proportional to the calculated round-trip-time, 
not a constant
        public static final int MAX_ROUND_TRIP_TIME = RECEIPT_TIMEOUT;
        public static final int MAX_CONSECUTIVE_MISSING_PACKET_REPORTS = 4;
        public static final int MAX_SEND_INTERVAL = 500;
+       public static final int CLEANUP_TIMEOUT = 5000;
        PartiallyReceivedBlock _prb;
        PeerContext _sender;
        long _uid;
@@ -52,7 +54,11 @@
        HashMap _recentlyReportedMissingPackets = new HashMap();
        ByteCounter _ctr;
        boolean sentAborted;
+       private MessageFilter discardFilter;
+       private long discardEndTime;

+       boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
+       
        public BlockReceiver(MessageCore usm, PeerContext sender, long uid, 
PartiallyReceivedBlock prb, ByteCounter ctr) {
                _sender = sender;
                _prb = prb;
@@ -68,15 +74,15 @@

        public byte[] receive() throws RetrievalException {
                int consecutiveMissingPacketReports = 0;
-               boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
                try {
+                       MessageFilter mfPacketTransmit = 
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
 _uid).setSource(_sender);
+                       MessageFilter mfAllSent = 
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID, 
_uid).setSource(_sender);
+                       MessageFilter mfSendAborted = 
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID, 
_uid).setSource(_sender);
+                       MessageFilter 
relevantMessages=mfPacketTransmit.or(mfAllSent.or(mfSendAborted));
                while (!_prb.allReceived()) {
                        Message m1;
             try {
-               MessageFilter mfPacketTransmit = 
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
 _uid).setSource(_sender);
-               MessageFilter mfAllSent = 
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID, 
_uid).setSource(_sender);
-               MessageFilter mfSendAborted = 
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID, 
_uid).setSource(_sender);
-                m1 = 
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)), _ctr);
+               m1 = _usm.waitFor(relevantMessages, _ctr);
                 if(!_sender.isConnected()) throw new DisconnectedException();
             } catch (DisconnectedException e1) {
                 Logger.normal(this, "Disconnected during receive: "+_uid+" 
from "+_sender);
@@ -151,6 +157,9 @@
                        }
                }
                _usm.send(_sender, DMT.createAllReceived(_uid), _ctr);
+               discardEndTime=System.currentTimeMillis()+CLEANUP_TIMEOUT;
+               discardFilter=relevantMessages;
+               maybeResetDiscardFilter();
                return _prb.getBlock();
                } catch(NotConnectedException e) {
                    throw new 
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
@@ -168,4 +177,36 @@
                        }
                }
        }
+       
+       private void maybeResetDiscardFilter() {
+               long timeleft=discardEndTime-System.currentTimeMillis();
+               if (timeleft>0) {
+                       try {
+                               discardFilter.setTimeout((int)timeleft);
+                               _usm.addAsyncFilter(discardFilter, this);
+                       } catch (DisconnectedException e) {
+                               //ignore
+                       }
+               }
+       }
+       
+       /**
+        * Used to discard leftover messages, usually just packetTransmit and 
allSent.
+        * allSent, is quite common, as the receive() routine usually quits 
immeadiately on receiving all packets.
+        * packetTransmit is less common, when receive() requested what it 
thought was a missing packet, only reordered.
+        */
+       public void onMatched(Message m) {
+               if (logMINOR)
+                       Logger.minor(this, "discarding message post-receive: 
"+m);
+               maybeResetDiscardFilter();                                      
                                                           
+       }
+       
+       public boolean shouldTimeout() {
+               return false;
+       }
+       
+       public void onTimeout() {
+               //ignore
+       }
+       
 }


Reply via email to