Author: toad
Date: 2005-11-26 00:55:28 +0000 (Sat, 26 Nov 2005)
New Revision: 7619

Added:
   trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
Modified:
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/InsertSender.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/ThrottledPacketSender.java
   trunk/freenet/src/freenet/node/Version.java
Log:
234: (mandatory)
Don't allow block transmitter packets to be queued for more than 30 seconds.
Cause a reject:overload if they do.

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-26 
00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-26 
00:55:28 UTC (rev 7619)
@@ -29,6 +29,7 @@
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.UdpSocketManager;
 import freenet.node.PeerNode;
+import freenet.node.ThrottledPacketLagException;
 import freenet.support.BitArray;
 import freenet.support.Logger;

@@ -48,6 +49,8 @@
        LinkedList _unsent;
        Thread _receiverThread, _senderThread;
        BitArray _sentPackets;
+       boolean failedByOverload = false;
+       final PacketThrottle throttle;

        public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source) {
                _usm = usm;
@@ -60,15 +63,7 @@
                        Logger.error(this, "Aborted during setup");
                        // Will throw on running
                }
-       }
-
-       public void sendAborted(int reason, String desc) throws 
NotConnectedException {
-               _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc));
-       }
-       
-       public boolean send() {
-               final PacketThrottle throttle = 
PacketThrottle.getThrottle(_destination.getPeer(), _prb._packetSize);
-               _receiverThread = Thread.currentThread();
+               throttle = PacketThrottle.getThrottle(_destination.getPeer(), 
_prb._packetSize);
                _senderThread = new Thread("_senderThread for "+_uid) {

                        public void run() {
@@ -102,7 +97,7 @@
                                                }
                                                _sentPackets.setBit(packetNo, 
true);
                                                try {
-                                                       
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)));
+                                                       
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), SEND_TIMEOUT);
                                                // We accelerate the ping rate 
during the transfer to keep a closer eye on round-trip-time
                                                sentSinceLastPing++;
                                                if (sentSinceLastPing >= 
PING_EVERY) {
@@ -112,14 +107,35 @@
                                                }
                                                } catch (NotConnectedException 
e) {
                                                    Logger.normal(this, 
"Terminating send: "+e);
-                                                   _sendComplete = true;
+                                                   synchronized(_senderThread) 
{
+                                                       _sendComplete = true;
+                                                       
_senderThread.notifyAll();
+                                                   }
                                                } catch (AbortedException e) {
                                                        Logger.normal(this, 
"Terminating send due to abort: "+e);
-                                                       _sendComplete = true;
+                                                       
synchronized(_senderThread) {
+                                                               _sendComplete = 
true;
+                                                               
_senderThread.notifyAll();
+                                                       }
+                                               } catch 
(ThrottledPacketLagException e) {
+                                                       Logger.error(this, 
"Terminating send due to overload: "+e);
+                                                       
synchronized(_senderThread) {
+                                                               
failedByOverload = true;
+                                                               _sendComplete = 
true;
+                                                               
_senderThread.notifyAll();
+                                                       }
                                                }
                                }
                        }
                };
+       }
+
+       public void sendAborted(int reason, String desc) throws 
NotConnectedException {
+               _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc));
+       }
+       
+       public boolean send() {
+               _receiverThread = Thread.currentThread();

                try {
                _unsent = _prb.addListener(new 
PartiallyReceivedBlock.PacketReceivedListener() {;
@@ -226,4 +242,20 @@
         t.setDaemon(true);
         t.start();
     }
+
+       public void waitForComplete() {
+               synchronized(_senderThread) {
+                       while(!_sendComplete) {
+                               try {
+                               _senderThread.wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
+       }
+
+       public boolean failedDueToOverload() {
+               return failedByOverload;
+       }
 }

Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2005-11-26 00:26:12 UTC 
(rev 7618)
+++ trunk/freenet/src/freenet/node/InsertSender.java    2005-11-26 00:55:28 UTC 
(rev 7619)
@@ -38,6 +38,7 @@
         this.closestLocation = closestLocation;
         this.startTime = System.currentTimeMillis();
         senderThreads = new LinkedList();
+        blockSenders = new LinkedList();
         Thread t = new Thread(this, "InsertSender for UID "+uid+" on 
"+node.portNumber+" at "+System.currentTimeMillis());
         t.setDaemon(true);
         t.start();
@@ -62,6 +63,7 @@
     final long startTime;
     private BlockTransmitter bt;
     private final LinkedList senderThreads;
+    private final LinkedList blockSenders;

     private int status = -1;
     static final int NOT_FINISHED = -1;
@@ -195,6 +197,7 @@
             senderThread.setDaemon(true);
             senderThread.start();
             senderThreads.add(senderThread);
+            blockSenders.add(bt);

             if(receiveFailed) return;
             try {
@@ -312,6 +315,15 @@
         Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
         if(status != NOT_FINISHED)
                throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
+
+        for(Iterator i = blockSenders.iterator();i.hasNext();) {
+               BlockTransmitter bt = (BlockTransmitter) i.next();
+               bt.waitForComplete();
+               if(bt.failedDueToOverload() && (status == SUCCESS || status == 
ROUTE_NOT_FOUND)) {
+                       status = REJECTED_OVERLOAD;
+                       break;
+               }
+        }

         for(Iterator i = senderThreads.iterator();i.hasNext();) {
                Thread senderThread = (Thread) i.next();

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-11-26 00:26:12 UTC 
(rev 7618)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-11-26 00:55:28 UTC 
(rev 7619)
@@ -1027,7 +1027,7 @@
 //     }
        }

-       public void throttledSend(Message message) throws NotConnectedException 
{
-               node.globalThrottle.sendPacket(message, this);
+       public void throttledSend(Message message, long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
+               node.globalThrottle.sendPacket(message, this, maxWaitTime);
        }
 }

Added: trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketLagException.java     
2005-11-26 00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/node/ThrottledPacketLagException.java     
2005-11-26 00:55:28 UTC (rev 7619)
@@ -0,0 +1,10 @@
+package freenet.node;
+
+/**
+ * Thrown when a throttled send is queued for too long.
+ * @author root
+ *
+ */
+public class ThrottledPacketLagException extends Exception {
+
+}

Modified: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-26 
00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-26 
00:55:28 UTC (rev 7619)
@@ -43,11 +43,18 @@
                RuntimeException re;
                Error err;

-               public void waitUntilSent() throws NotConnectedException {
+               public void waitUntilSent(long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
+                       long startTime = System.currentTimeMillis();
+                       long waitEndTime = startTime + maxWaitTime;
                        synchronized(this) {
                                while(!(sent || lostConn || re != null || err 
!= null)) {
                                        try {
-                                               wait(10*1000);
+                                               long wait = waitEndTime - 
System.currentTimeMillis();
+                                               if(wait > 0)
+                                                       wait(10*1000);
+                                               if(wait <= 0) {
+                                                       throw new 
ThrottledPacketLagException();
+                                               }
                                        } catch (InterruptedException e) {
                                                // Ignore
                                        }
@@ -65,9 +72,9 @@
                }
        }

-       public void sendPacket(Message msg, PeerNode pn) throws 
NotConnectedException {
+       public void sendPacket(Message msg, PeerNode pn, long maxWaitTime) 
throws NotConnectedException, ThrottledPacketLagException {
                ThrottledPacket p = queuePacket(msg, pn);
-               p.waitUntilSent();
+               p.waitUntilSent(maxWaitTime);
        }

        private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws 
NotConnectedException {

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-26 00:26:12 UTC (rev 
7618)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-26 00:55:28 UTC (rev 
7619)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 233;
+       public static final int buildNumber = 234;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 232;
+       public static final int lastGoodBuild = 234;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;


Reply via email to