Author: toad
Date: 2008-03-07 15:18:04 +0000 (Fri, 07 Mar 2008)
New Revision: 18410

Added:
   trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
Modified:
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/comm/RetrievalException.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
   trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Add a timeout for sendThrottledPacket(). Should also fix leaking 
BlockTransmitters on disconnection.

Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-03-07 13:29:54 UTC 
(rev 18409)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-03-07 15:18:04 UTC 
(rev 18410)
@@ -6,6 +6,7 @@
 import java.lang.ref.WeakReference;

 import freenet.io.xfer.PacketThrottle;
+import freenet.io.xfer.WaitedTooLongException;
 import freenet.node.OutgoingPacketMangler;

 /**
@@ -35,7 +36,7 @@
        public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;

        /** Send a throttled message to the node (may block for a long time). */
-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr) throws NotConnectedException;
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout) throws NotConnectedException, 
WaitedTooLongException;

        /** Get the current boot ID. This is a random number that changes every 
time the node starts up. */
        public long getBootID();

Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java   2008-03-07 
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java   2008-03-07 
15:18:04 UTC (rev 18410)
@@ -39,6 +39,7 @@
     public static final int NO_DATAINSERT = 8;
     public static final int CANCELLED_BY_RECEIVER = 9;
        public static final int RECEIVER_DIED = 11;
+       public static final int UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT = 12;

        int _reason;
        String _cause;
@@ -83,6 +84,8 @@
                                return "CANCELLED_BY_RECEIVER";
                        case UNKNOWN:
                                return "UNKNOWN";
+                       case UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT:
+                               return "UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT";
                        default:
                                return "UNKNOWN ("+reason+")";
                }

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-03-07 
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-03-07 
15:18:04 UTC (rev 18410)
@@ -97,7 +97,7 @@
                                        }
                                        int totalPackets;
                                        try {
-                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr);
+                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT);
                                                if(_ctr != null) 
_ctr.sentPayload(_prb._packetSize);
                                                
totalPackets=_prb.getNumPackets();
                                        } catch (NotConnectedException e) {
@@ -108,6 +108,12 @@
                                                Logger.normal(this, 
"Terminating send due to abort: "+e);
                                                //the send() thread should 
notice...
                                                return;
+                                       } catch (WaitedTooLongException e) {
+                                               Logger.normal(this, "Waited too 
long to send packet, aborting");
+                                               synchronized(_senderThread) {
+                                                       _sendComplete = true;
+                                               }
+                                               return;
                                        }
                                        synchronized (_senderThread) {
                                                _sentPackets.setBit(packetNo, 
true);
@@ -164,6 +170,9 @@
                        executor.execute(_senderThread, toString());

                        while (true) {
+                               synchronized(_senderThread) {
+                                       if(_sendComplete) return false;
+                               }
                                Message msg;
                                boolean logMINOR = 
Logger.shouldLog(Logger.MINOR, this);
                                try {

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-03-07 
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-03-07 
15:18:04 UTC (rev 18410)
@@ -243,7 +243,7 @@

                        // Congestion control and bandwidth limiting
                        try {
-                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
prb.blockSize, ctr);
+                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
prb.blockSize, ctr, BulkReceiver.TIMEOUT);
                                if(ctr != null) ctr.sentPayload(prb.blockSize);
                                synchronized(this) {
                                        blocksNotSentButPresent.setBit(blockNo, 
false);
@@ -254,6 +254,9 @@
                                if(logMINOR)
                                        Logger.minor(this, "Canclled: not 
connected "+this);
                                return false;
+                       } catch (WaitedTooLongException e) {
+                               Logger.error(this, "Failed to send bulk packet 
"+blockNo+" for "+this);
+                               return false;
                        }
                }
        }

Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-03-07 
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-03-07 
15:18:04 UTC (rev 18410)
@@ -159,7 +159,7 @@
                return ((PACKET_SIZE * 1000.0 / getDelay()));
        }

-       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr) throws 
NotConnectedException, ThrottleDeprecatedException {
+       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long 
deadline) throws NotConnectedException, ThrottleDeprecatedException, 
WaitedTooLongException {
                long start = System.currentTimeMillis();
                long bootID = peer.getBootID();
                synchronized(this) {
@@ -177,8 +177,22 @@
                                        break;
                                }
                                if(logMINOR) Logger.minor(this, "Window size: 
"+windowSize+" packets in flight "+_packetsInFlight+" for "+this);
+                               long now = System.currentTimeMillis();
+                               int waitFor = (int)Math.min(Integer.MAX_VALUE, 
deadline - now);
+                               if(waitFor <= 0) {
+                                       // Double-check.
+                                       if(!peer.isConnected()) {
+                                               Logger.error(this, "Not 
notified of disconnection before timeout");
+                                               throw new 
NotConnectedException();
+                                       }
+                                       if(bootID != peer.getBootID()) {
+                                               Logger.error(this, "Not 
notified of reconnection before timeout");
+                                               throw new 
NotConnectedException();
+                                       }
+                                       Logger.error(this, "Unable to send 
throttled message, waited "+(now-start)+"ms");
+                               }
                                try {
-                                       wait();
+                                       wait(waitFor);
                                } catch (InterruptedException e) {
                                        // Ignore
                                }

Modified: trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java  
2008-03-07 13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java  
2008-03-07 15:18:04 UTC (rev 18410)
@@ -1,3 +1,6 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
 package freenet.io.xfer;

 /**

Added: trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java               
                (rev 0)
+++ trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java       
2008-03-07 15:18:04 UTC (rev 18410)
@@ -0,0 +1,12 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.xfer;
+
+/**
+ * Thrown when we wait too long to send a throttled packet.
+ * @author toad
+ */
+public class WaitedTooLongException extends Exception {
+
+}

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-03-07 13:29:54 UTC 
(rev 18409)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-03-07 15:18:04 UTC 
(rev 18410)
@@ -53,6 +53,7 @@
 import freenet.io.comm.SocketHandler;
 import freenet.io.xfer.PacketThrottle;
 import freenet.io.xfer.ThrottleDeprecatedException;
+import freenet.io.xfer.WaitedTooLongException;
 import freenet.keys.ClientSSK;
 import freenet.keys.FreenetURI;
 import freenet.keys.Key;
@@ -3644,10 +3645,11 @@
                return resendBytesSent;
        }

-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr) throws NotConnectedException {
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout) throws NotConnectedException, 
WaitedTooLongException {
+               long deadline = System.currentTimeMillis() + timeout;
                for(int i=0;i<100;i++) {
                        try {
-                               getThrottle().sendThrottledMessage(msg, this, 
node.outputThrottle, packetSize, ctr);
+                               getThrottle().sendThrottledMessage(msg, this, 
node.outputThrottle, packetSize, ctr, deadline);
                                return;
                        } catch (ThrottleDeprecatedException e) {
                                // Try with the new throttle. We don't need it, 
we'll get it from getThrottle().


Reply via email to