Author: toad
Date: 2007-05-31 22:54:42 +0000 (Thu, 31 May 2007)
New Revision: 13426

Modified:
   trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
Log:
Finish async API, use it to wait for cancellation in BulkTransmitter.

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2007-05-31 
22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2007-05-31 
22:54:42 UTC (rev 13426)
@@ -471,6 +471,74 @@
            }
        }

+
+       public void addAsyncFilter(MessageFilter filter, 
AsyncMessageFilterCallback callback) throws DisconnectedException {
+               filter.setAsyncCallback(callback);
+               boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+               if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+" 
for "+callback);
+               Message ret = null;
+               if((lowLevelFilter != null) && (filter._source != null) && 
+                       filter.matchesDroppedConnection() &&
+                       lowLevelFilter.isDisconnected(filter._source))
+                   throw new DisconnectedException();
+               // Check to see whether the filter matches any of the recently 
_unclaimed messages
+               // Drop any _unclaimed messages that the filter doesn't match 
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+               long now = System.currentTimeMillis();
+               long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+               long messageLifeTime = 0;
+               synchronized (_filters) {
+                       if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
+                               Message m = (Message) i.next();
+                               if (filter.match(m)) {
+                                       i.remove();
+                                       ret = m;
+                                       if(logMINOR) Logger.debug(this, 
"Matching from _unclaimed");
+                                       break;
+                               } else if (m.localInstantiationTime < 
messageDropTime) {
+                                       i.remove();
+                                       messageLifeTime = now - 
m.localInstantiationTime;
+                                       if ((m.getSource()) instanceof 
PeerNode) {
+                                               Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+                                       } else {
+                                               Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": 
"+m);
+                                       }
+                               }
+                       }
+                       if (ret == null) {
+                               if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
+                           // Insert filter into filter list in order of 
timeout
+                               ListIterator i = _filters.listIterator();
+                               while (true) {
+                                       if (!i.hasNext()) {
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added at end");
+                                               break;
+                                       }
+                                       MessageFilter mf = (MessageFilter) 
i.next();
+                                       if (mf.getTimeout() > 
filter.getTimeout()) {
+                                               i.previous();
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               if(ret != null) {
+                       filter.onMatched();
+                       filter.clearMatched();
+               }
+       }
+
+       /**
+        * Wait for a filter to trigger, or timeout. Blocks until either the 
trigger is activated, or it times
+        * out, or the peer is disconnected.
+        * @param filter The filter to wait for.
+        * @param ctr Byte counter to add bytes from the message to.
+        * @return Either a message, or null if the filter timed out.
+        * @throws DisconnectedException If the single peer being waited for 
disconnects.
+        */
        public Message waitFor(MessageFilter filter, ByteCounter ctr) throws 
DisconnectedException {
                boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
                if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
@@ -556,6 +624,10 @@
                                filter.clearMatched();
                        }
                        if(logDEBUG) Logger.debug(this, "Returning "+ret+" from 
"+filter);
+               } else {
+                       // Matched an unclaimed packet
+                       filter.onMatched();
+                       filter.clearMatched();
                }
                // Probably get rid...
 //             if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-31 
22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-31 
22:54:42 UTC (rev 13426)
@@ -3,7 +3,11 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.io.xfer;

+import freenet.io.comm.AsyncMessageFilterCallback;
 import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.support.BitArray;
@@ -31,9 +35,10 @@
        final long peerBootID;
        /** The overall hard bandwidth limiter */
        final DoubleTokenBucket masterThrottle;
+       private boolean sentCancel;
+       private boolean finished;

-
-       public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid, DoubleTokenBucket masterThrottle) {
+       public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid, DoubleTokenBucket masterThrottle) throws DisconnectedException {
                this.prb = prb;
                this.peer = peer;
                this.uid = uid;
@@ -48,6 +53,24 @@
                        blocksNotSentButPresent = prb.cloneBlocksReceived();
                        prb.add(this);
                }
+               try {
+                       
prb.usm.addAsyncFilter(MessageFilter.create().setNoTimeout().setSource(peer).setType(DMT.FNPBulkReceiveAborted).setField(DMT.UID,
 uid),
+                                       new AsyncMessageFilterCallback() {
+                                               public void onMatched(Message 
m) {
+                                                       cancel();
+                                               }
+                                               public boolean shouldTimeout() {
+                                                       
synchronized(BulkTransmitter.this) {
+                                                               if(cancelled || 
finished) return true;
+                                                       }
+                                                       
if(BulkTransmitter.this.prb.isAborted()) return true;
+                                                       return false;
+                                               }
+                       });
+               } catch (DisconnectedException e) {
+                       cancel();
+                       throw e;
+               }
        }

        /**
@@ -64,22 +87,26 @@
         * Called when the PRB is aborted.
         */
        public void onAborted() {
-               try {
-                       peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 
0, null);
-               } catch (NotConnectedException e) {
-                       // Cool
-               }
+               sendAbortedMessage();
                synchronized(this) {
                        notifyAll();
                }
        }

-       public void cancel() {
+       private void sendAbortedMessage() {
+               synchronized(this) {
+                       if(sentCancel) return;
+                       sentCancel = true;
+               }
                try {
                        peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 
0, null);
                } catch (NotConnectedException e) {
                        // Cool
                }
+       }
+
+       public void cancel() {
+               sendAbortedMessage();
                synchronized(this) {
                        cancelled = true;
                        notifyAll();

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-31 22:40:13 UTC (rev 13425)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-31 22:54:42 UTC (rev 13426)
@@ -7,6 +7,7 @@

 import freenet.io.comm.DMT;
 import freenet.io.comm.RetrievalException;
+import freenet.io.comm.UdpSocketManager;
 import freenet.node.FNPPacketMangler;
 import freenet.support.BitArray;
 import freenet.support.Logger;
@@ -30,6 +31,7 @@
        private final BitArray blocksReceived;
        final int blocks;
        private BulkTransmitter[] transmitters;
+       final UdpSocketManager usm;
        /** The one and only BulkReceiver */
        private BulkReceiver recv;
        private int blocksReceivedCount;
@@ -47,10 +49,11 @@
         * @param initialState If true, assume all blocks have been received. 
If false, assume no blocks have
         * been received.
         */
-       public PartiallyReceivedBulk(long size, int blockSize, 
RandomAccessThing raf, boolean initialState) {
+       public PartiallyReceivedBulk(UdpSocketManager usm, long size, int 
blockSize, RandomAccessThing raf, boolean initialState) {
                this.size = size;
                this.blockSize = blockSize;
                this.raf = raf;
+               this.usm = usm;
                long blocks = size / blockSize + (size % blockSize > 0 ? 1 : 0);
                if(blocks > Integer.MAX_VALUE)
                        throw new IllegalArgumentException("Too big");


Reply via email to