Author: toad
Date: 2007-06-01 11:52:21 +0000 (Fri, 01 Jun 2007)
New Revision: 13436

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
Log:
FNPBulkReceivedAll, timeouts, bugfixes

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2007-06-01 11:34:51 UTC (rev 
13435)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2007-06-01 11:52:21 UTC (rev 
13436)
@@ -272,6 +272,16 @@
                return msg;
        }

+       public static final MessageType FNPBulkReceivedAll = new 
MessageType("FNPBulkReceivedAll") {{
+               addField(UID, Long.class);
+       }};
+       
+       public static final Message createFNPBulkReceivedAll(long uid) {
+               Message msg = new Message(FNPBulkReceivedAll);
+               msg.set(UID, uid);
+               return msg;
+       }
+       
        public static final MessageType testTransferSend = new 
MessageType("testTransferSend") {{
                addField(UID, Long.class);
        }};

Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:34:51 UTC 
(rev 13435)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:52:21 UTC 
(rev 13436)
@@ -57,7 +57,14 @@
                MessageFilter mfSendKilled = 
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkSendAborted) 
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
                MessageFilter mfPacket = 
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend) 
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
                while(true) {
-                       if(prb.hasWholeFile()) return true;
+                       if(prb.hasWholeFile()) {
+                               try {
+                                       
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       // Ignore, we have the data.
+                               }
+                               return true;
+                       }
                        Message m;
                        try {
                                m = prb.usm.waitFor(mfSendKilled.or(mfPacket), 
null);

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-06-01 
11:34:51 UTC (rev 13435)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-06-01 
11:52:21 UTC (rev 13436)
@@ -12,6 +12,7 @@
 import freenet.io.comm.PeerContext;
 import freenet.support.BitArray;
 import freenet.support.DoubleTokenBucket;
+import freenet.support.Logger;

 /**
  * Bulk data transfer (not block). Bulk transfer is designed for files which 
may be much bigger than a 
@@ -22,6 +23,8 @@
  */
 public class BulkTransmitter {

+       /** If no packets sent in this period, and no completion 
acknowledgement / cancellation, assume failure. */
+       static final int TIMEOUT = 5*60*1000;
        /** Available blocks */
        final PartiallyReceivedBulk prb;
        /** Peer who we are sending the data to */
@@ -67,6 +70,19 @@
                                                        return false;
                                                }
                        });
+                       
prb.usm.addAsyncFilter(MessageFilter.create().setNoTimeout().setSource(peer).setType(DMT.FNPBulkReceivedAll).setField(DMT.UID,
 uid),
+                                       new AsyncMessageFilterCallback() {
+                                               public void onMatched(Message 
m) {
+                                                       completed();
+                                               }
+                                               public boolean shouldTimeout() {
+                                                       
synchronized(BulkTransmitter.this) {
+                                                               if(cancelled || 
finished) return true;
+                                                       }
+                                                       
if(BulkTransmitter.this.prb.isAborted()) return true;
+                                                       return false;
+                                               }
+                       });
                } catch (DisconnectedException e) {
                        cancel();
                        throw e;
@@ -113,12 +129,24 @@
                }
                prb.remove(this);
        }
+
+       /** Like cancel(), but without the negative overtones: The client says 
it's got everything,
+        * we believe them (even if we haven't sent everything; maybe they had 
a partial). */
+       public void completed() {
+               synchronized(this) {
+                       finished = true;
+                       notifyAll();
+               }
+               prb.remove(this);
+       }

        /**
         * Send the file.
         * @return True if the file was successfully sent. False otherwise.
         */
        public boolean send() {
+               int packetSize = prb.getPacketSize();
+               long lastSentPacket = System.currentTimeMillis();
                while(true) {
                        if(prb.isAborted()) return false;
                        int blockNo;
@@ -130,23 +158,27 @@
                                prb.remove(BulkTransmitter.this);
                                return false;
                        }
-                       boolean hasAll = prb.hasWholeFile();
                        synchronized(this) {
+                               if(finished) return true;
                                if(cancelled) return false;
                                blockNo = blocksNotSentButPresent.firstOne();
                        }
-                       if(blockNo < 0 && hasAll) {
-                               prb.remove(BulkTransmitter.this);
-                               return true; // All done
-                       } else if(blockNo < 0) {
+                       if(blockNo < 0) {
+                               // Wait for a packet, BulkReceivedAll or 
BulkReceiveAborted
                                synchronized(this) {
                                        try {
                                                wait(60*1000);
                                        } catch (InterruptedException e) {
                                                // No problem
+                                               continue;
                                        }
-                                       continue;
                                }
+                               long end = System.currentTimeMillis();
+                               if(end - lastSentPacket > TIMEOUT) {
+                                       Logger.error(this, "Send timed out on 
"+this);
+                                       cancel();
+                                       return false;
+                               }
                        }
                        // Send a packet
                        byte[] buf = prb.getBlockData(blockNo);
@@ -159,12 +191,22 @@
                        long now = System.currentTimeMillis();
                        long waitUntil = peer.getThrottle().scheduleDelay(now);

-                       masterThrottle.blockingGrab(prb.getPacketSize());
+                       masterThrottle.blockingGrab(packetSize);

                        while((now = System.currentTimeMillis()) < waitUntil) {
                                long sleepTime = waitUntil - now;
                                try {
-                                       Thread.sleep(sleepTime);
+                                       synchronized(this) {
+                                               wait(sleepTime);
+                                               if(finished) {
+                                                       
masterThrottle.recycle(packetSize);
+                                                       return true;
+                                               }
+                                               if(cancelled) {
+                                                       
masterThrottle.recycle(packetSize);
+                                                       return false;
+                                               }
+                                       }
                                } catch (InterruptedException e) {
                                        // Ignore
                                }
@@ -173,6 +215,10 @@

                        try {
                                peer.sendAsync(DMT.createFNPBulkPacketSend(uid, 
blockNo, buf), null, 0, null);
+                               synchronized(this) {
+                                       blocksNotSentButPresent.setBit(blockNo, 
false);
+                               }
+                               lastSentPacket = System.currentTimeMillis();
                        } catch (NotConnectedException e) {
                                cancel();
                                return false;


Reply via email to