Author: toad
Date: 2005-11-30 21:27:36 +0000 (Wed, 30 Nov 2005)
New Revision: 7640

Removed:
   trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
   trunk/freenet/src/freenet/node/ThrottledPacketSender.java
Modified:
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodePinger.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/Version.java
Log:
252: (mandatory)
Eliminate old throttled packet queue hard bandwidth limiter.
Implement new, simpler, ian-inspired hard bandwidth limiter (idea from 
ian/dijjer).
Also implement new, untested, currently unused, soft bandwidth limiter.
Take average total delay to send a block into account in load estimation.
Delete some dead code.
Also logging.


Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-30 
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-30 
21:27:36 UTC (rev 7640)
@@ -74,6 +74,7 @@
                 Logger.normal(this, "Disconnected during receive: "+_uid+" 
from "+_sender);
                 throw new 
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
             }
+            Logger.minor(this, "Received "+m1);
             if ((m1 != null) && m1.getSpec().equals(DMT.sendAborted)) {
                                _prb.abort(m1.getInt(DMT.REASON), 
m1.getString(DMT.DESCRIPTION));
                                throw new 
RetrievalException(m1.getInt(DMT.REASON), m1.getString(DMT.DESCRIPTION));
@@ -96,8 +97,7 @@
                                                Long resendTime = (Long) 
_recentlyReportedMissingPackets.get(new Integer(x));
                                                if ((resendTime == null) || 
(System.currentTimeMillis() > resendTime.longValue())) {
                                                        // Make a note of the 
earliest time we should resend this, based on the number of other
-                                                       // packets we
-                                                       // are already waiting 
for
+                                                       // packets we are 
already waiting for
                                                        long resendWait = 
System.currentTimeMillis()
                                                                        + 
(MAX_ROUND_TRIP_TIME + (_recentlyReportedMissingPackets.size() * 
MAX_SEND_INTERVAL));
                                                        
_recentlyReportedMissingPackets.put(new Integer(x), (new Long(resendWait)));
@@ -105,6 +105,7 @@
                                                }
                                        }
                                }
+                               Logger.minor(this, "Missing: "+missing.size());
                                if (missing.size() > 0) {
                                        Message mn = 
DMT.createMissingPacketNotification(_uid, missing);
                                        _usm.send(_sender, mn);

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-30 
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-30 
21:27:36 UTC (rev 7640)
@@ -29,7 +29,6 @@
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.UdpSocketManager;
 import freenet.node.PeerNode;
-import freenet.node.ThrottledPacketLagException;
 import freenet.support.BitArray;
 import freenet.support.Logger;

@@ -51,7 +50,48 @@
        BitArray _sentPackets;
        boolean failedByOverload = false;
        final PacketThrottle throttle;
+       
+       // Static stuff for global bandwidth limiter
+       /** Synchronization object for bandwidth limiting */
+       static final Object lastPacketSendTimeSync = new Object();
+       /** Time at which the last known packet is scheduled to be sent.
+        * We will not send another packet until at least minPacketDelay ms 
after this time. */
+       static long hardLastPacketSendTime = System.currentTimeMillis();
+       /** Minimum interval between packet sends, for overall hard bandwidth 
limiter */
+       static long minPacketDelay = 0;
+       /** Minimum average interval between packet sends, for averaged (soft) 
overall
+        * bandwidth usage limiter. */
+       static long minSoftDelay = 0;
+       /** "Soft" equivalent to hardLastPacketSendTime. Can lag up to half the 
softLimitPeriod
+        * behind the current time. Otherwise is similar. This gives it 
flexibility; we can have
+        * spurts above the average limit, but over the softLimitPeriod, it 
will average out to 
+        * the target minSoftDelay. */
+       static long softLastPacketSendTime = System.currentTimeMillis();
+       /** Period over which the soft limiter should work */
+       static long softLimitPeriod;

+       public static void setMinPacketInterval(int delay) {
+               synchronized(lastPacketSendTimeSync) {
+                       minPacketDelay = delay;
+               }
+       }
+       
+       public static void setSoftMinPacketInterval(int delay) {
+               synchronized(lastPacketSendTimeSync) {
+                       minSoftDelay = delay;
+               }
+       }
+       
+       public static void setSoftLimitPeriod(long period) {
+               synchronized(lastPacketSendTimeSync) {
+                       softLimitPeriod = period;
+                       long now = System.currentTimeMillis();
+                       if(now - softLastPacketSendTime > period / 2) {
+                               softLastPacketSendTime = now - (period / 2);
+                       }
+               }
+       }
+       
        public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source) {
                _usm = usm;
                _destination = destination;
@@ -69,18 +109,8 @@
                        public void run() {
                                int sentSinceLastPing = 0;
                                while (!_sendComplete) {
-                                               long delay = 
throttle.getDelay();
-                                               long waitUntil = 
System.currentTimeMillis() + delay;
-                                               Logger.minor(this, "Waiting for 
"+delay+" ms for "+_uid+" : "+throttle);
+                                               long startCycleTime = 
System.currentTimeMillis();
                                                try {
-                                                       while (waitUntil > 
System.currentTimeMillis()) {
-                                                               
if(_sendComplete) return;
-                                                               synchronized 
(_senderThread) {
-                                                                       long x 
= waitUntil - System.currentTimeMillis();
-                                                                       if(x > 
0)
-                                                                               
_senderThread.wait(x);
-                                                               }
-                                                       }
                                                        while (true) {
                                                                
synchronized(_unsent) {
                                                                        
if(_unsent.size() != 0) break;
@@ -91,13 +121,24 @@
                                                                }
                                                        }
                                                } catch (InterruptedException 
e) {  }
+                                               long startDelayTime = 
System.currentTimeMillis();
+                                               delay(startCycleTime);
                                                int packetNo;
                                                synchronized(_unsent) {
                                                        packetNo = ((Integer) 
_unsent.removeFirst()).intValue();
                                                }
                                                _sentPackets.setBit(packetNo, 
true);
                                                try {
-                                                       
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), SEND_TIMEOUT);
+                                                       long endDelayTime = 
System.currentTimeMillis();
+                                                       long delayTime = 
endDelayTime - startDelayTime;
+                                                       
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
+                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null);
+                                                       // May have been delays 
in sending, so update to avoid sending more frequently than allowed
+                                                       long now = 
System.currentTimeMillis();
+                                                       
synchronized(lastPacketSendTimeSync) {
+                                                               
if(hardLastPacketSendTime < now)
+                                                                       
hardLastPacketSendTime = now;
+                                                       }
                                                // We accelerate the ping rate 
during the transfer to keep a closer eye on round-trip-time
                                                sentSinceLastPing++;
                                                if (sentSinceLastPing >= 
PING_EVERY) {
@@ -117,14 +158,78 @@
                                                                _sendComplete = 
true;
                                                                
_senderThread.notifyAll();
                                                        }
-                                               } catch 
(ThrottledPacketLagException e) {
-                                                       Logger.error(this, 
"Terminating send due to overload: "+e);
-                                                       
synchronized(_senderThread) {
-                                                               
failedByOverload = true;
-                                                               _sendComplete = 
true;
-                                                               
_senderThread.notifyAll();
+                                               }
+                               }
+                       }
+
+                       /** @return True if _sendComplete */
+                       private boolean delay(long startCycleTime) {
+                               
+                               // Get the current inter-packet delay
+                               long delay = throttle.getDelay();
+                               
+                               while(true) {
+                                       
+                                       long now = System.currentTimeMillis();
+                                       
+                                       long endTime = -1;
+                                       
+                                       boolean thenSend = true;
+                                       
+                                       // Synchronize on the static lock, and 
update
+                                       synchronized(lastPacketSendTimeSync) {
+                                               
+                                               // Get the current time
+                                               now = 
System.currentTimeMillis();
+                                               
+                                               // Update time if necessary to 
avoid spurts
+                                               if(hardLastPacketSendTime < 
(now - minPacketDelay))
+                                                       hardLastPacketSendTime 
= now - minPacketDelay;
+                                               
+                                               // Wait until the next send 
window
+                                               long newHardLastPacketSendTime =
+                                                       hardLastPacketSendTime 
+ minPacketDelay;
+                                               
+                                               long earliestSendTime = 
startCycleTime + delay;
+                                               
+                                               if(earliestSendTime > 
hardLastPacketSendTime) {
+                                                       // Don't clog up other 
senders!
+                                                       thenSend = false;
+                                                       endTime = 
earliestSendTime;
+                                               } else {
+                                                       hardLastPacketSendTime 
= newHardLastPacketSendTime;
+                                                       endTime = 
hardLastPacketSendTime;
+                                                       
+                                                       // What about the soft 
limit?
+                                                       
+                                                       if(now - 
softLastPacketSendTime > minSoftDelay / 2) {
+                                                               
softLastPacketSendTime = now - (minSoftDelay / 2);
                                                        }
+                                                       
+                                                       softLastPacketSendTime 
+= minSoftDelay;
+                                                       
+                                                       
if(softLastPacketSendTime > hardLastPacketSendTime) {
+                                                               endTime = 
hardLastPacketSendTime = softLastPacketSendTime;
+                                                       }
                                                }
+                                       }
+                                       
+                                       while(now < endTime) {
+                                               synchronized(_senderThread) {
+                                                       if(_sendComplete)
+                                                               return true;
+                                                       try {
+                                                               
_senderThread.wait(endTime - now);
+                                                       } catch 
(InterruptedException e) {
+                                                               // Ignore
+                                                       }
+                                               }
+                                               if(_sendComplete)
+                                                       return true;
+                                               now = 
System.currentTimeMillis();
+                                       }
+                                       
+                                       if(thenSend) return false;
                                }
                        }
                };

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-11-30 18:11:21 UTC (rev 
7639)
+++ trunk/freenet/src/freenet/node/Node.java    2005-11-30 21:27:36 UTC (rev 
7640)
@@ -37,6 +37,7 @@
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.UdpSocketManager;
 import freenet.io.xfer.AbortedException;
+import freenet.io.xfer.BlockTransmitter;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKVerifyException;
@@ -330,8 +331,12 @@
         insertSenders = new HashMap();
         runningUIDs = new HashSet();

-        globalThrottle = new ThrottledPacketSender(throttleInterval);
+        BlockTransmitter.setMinPacketInterval(throttleInterval);

+        // FIXME test the soft limit
+        BlockTransmitter.setSoftLimitPeriod(14*24*60*60*1000);
+        BlockTransmitter.setSoftMinPacketInterval(0);
+        
                lm = new LocationManager(random);

         try {
@@ -866,7 +871,6 @@

     final LRUQueue recentlyCompletedIDs;

-       public final ThrottledPacketSender globalThrottle;
     static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;

     /**

Modified: trunk/freenet/src/freenet/node/NodePinger.java
===================================================================
--- trunk/freenet/src/freenet/node/NodePinger.java      2005-11-30 18:11:21 UTC 
(rev 7639)
+++ trunk/freenet/src/freenet/node/NodePinger.java      2005-11-30 21:27:36 UTC 
(rev 7640)
@@ -48,7 +48,10 @@
                        PeerNode peer = peers[i];
                        if(!peer.isConnected()) continue;
                        peerCount++;
-                       total *= peer.averagePingTime();
+                       double avgPingTime = peer.averagePingTime();
+                       double avgThrottledPacketSendTime = 
peer.throttledPacketSendAverage.currentValue();
+                       double value = Math.max(avgPingTime, 
avgThrottledPacketSendTime);
+                       total *= value;
                }
                if(peerCount > 0) {
                        total = Math.pow(total, 1.0 / peerCount);

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-11-30 18:11:21 UTC 
(rev 7639)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-11-30 21:27:36 UTC 
(rev 7640)
@@ -152,31 +152,6 @@
     /** The time at which we last completed a connection setup. */
     private long connectedTime;

-    /** The overall probability of the node rejecting a request or insert
-     * because of overload or timeout.
-     */
-    private final RunningAverage pRejectOverload;
-    
-    /** The probability of the node rejecting a data request because of
-     * overload, or of it timing out etc.
-     */
-    private final RunningAverage pDataRequestRejectOverload;
-    
-    /** The probability of the node rejecting an insert because of
-     * overload, timing out, etc.
-     */
-    private final RunningAverage pInsertRejectOverload;
-    
-    private final Object biasLock = new Object();
-    
-    private double biasValue = 1.0;
-    
-    /** Sensitivity of bias computations */
-    private double BIAS_SENSITIVITY = 0.01;
-    
-    /** Target pRO for bias computations */
-    private double BIAS_TARGET = 0.05;
-    
     /**
      * Create a PeerNode from a SimpleFieldSet containing a
      * node reference for one. This must contain the following
@@ -269,11 +244,9 @@
         decrementHTLAtMinimum = node.random.nextFloat() < 
Node.DECREMENT_AT_MIN_PROB;

         // FIXME maybe a simple binary RA would be better?
-        pDataRequestRejectOverload = new SimpleRunningAverage(100, 0.05);
-        pInsertRejectOverload = new SimpleRunningAverage(100, 0.05);
-        pRejectOverload = new SimpleRunningAverage(100, 0.05);
         pingNumber = node.random.nextLong();
         pingAverage = new SimpleRunningAverage(20, 1);
+        throttledPacketSendAverage = new SimpleRunningAverage(20, 1);
     }

     private void randomizeMaxTimeBetweenPacketSends() {
@@ -952,10 +925,6 @@
         return hashCode;
     }

-       public void throttledSend(Message message, long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
-               node.globalThrottle.sendPacket(message, this, maxWaitTime);
-       }
-
        private final Object backoffSync = new Object();

        public boolean isBackedOff() {
@@ -1005,6 +974,7 @@
        final LRUHashtable pingsSentTimes = new LRUHashtable();
        long pingNumber;
        final RunningAverage pingAverage;
+       final RunningAverage throttledPacketSendAverage;

        public void sendPing() {
                long pingNo;
@@ -1054,4 +1024,8 @@
        public double averagePingTime() {
                return pingAverage.currentValue();
        }
+
+       public void reportThrottledPacketSendTime(long timeDiff) {
+               throttledPacketSendAverage.report(timeDiff);
+       }
 }

Deleted: trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketLagException.java     
2005-11-30 18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/node/ThrottledPacketLagException.java     
2005-11-30 21:27:36 UTC (rev 7640)
@@ -1,10 +0,0 @@
-package freenet.node;
-
-/**
- * Thrown when a throttled send is queued for too long.
- * @author root
- *
- */
-public class ThrottledPacketLagException extends Exception {
-
-}

Deleted: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-30 
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-30 
21:27:36 UTC (rev 7640)
@@ -1,161 +0,0 @@
-package freenet.node;
-
-import java.util.LinkedList;
-
-import freenet.io.comm.Message;
-import freenet.io.comm.NotConnectedException;
-import freenet.support.Logger;
-
-/**
- * Sends throttled packets.
- * Only data packets are throttled, and they are all
- * slightly over 1kB. So we can throttle bandwidth by
- * just sending no more than N of these per second.
- * Initially a rather brutal implementation; we send one
- * packet, every 1/N seconds, or we don't. That's it!
- */
-public class ThrottledPacketSender implements Runnable {
-
-       final LinkedList queuedPackets;
-       final int sleepTime;
-       
-       public ThrottledPacketSender(int sleepTime) {
-               this.sleepTime = sleepTime;
-               queuedPackets = new LinkedList();
-               Thread t = new Thread(this, "Throttled packet sender");
-               t.setDaemon(true);
-               t.start();
-       }
-       
-       public class ThrottledPacket {
-               public ThrottledPacket(Message msg2, PeerNode pn2) {
-                       this.msg = msg2;
-                       this.pn = pn2;
-                       sent = false;
-                       queuedTime = System.currentTimeMillis();
-               }
-               
-               final Message msg;
-               final PeerNode pn;
-               final long queuedTime;
-               boolean sent;
-               boolean lostConn;
-               RuntimeException re;
-               Error err;
-               
-               public void waitUntilSent(long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
-                       long startTime = System.currentTimeMillis();
-                       long waitEndTime = startTime + maxWaitTime;
-                       synchronized(this) {
-                               while(!(sent || lostConn || re != null || err 
!= null)) {
-                                       try {
-                                               long wait = waitEndTime - 
System.currentTimeMillis();
-                                               if(wait > 0)
-                                                       wait(10*1000);
-                                               if(wait <= 0) {
-                                                       throw new 
ThrottledPacketLagException();
-                                               }
-                                       } catch (InterruptedException e) {
-                                               // Ignore
-                                       }
-                               }
-                               if(lostConn) throw new NotConnectedException();
-                               if(re != null) throw re;
-                               if(err != null) throw err;
-                               long timeDiff = System.currentTimeMillis() - 
queuedTime;
-                               if(timeDiff > 30*1000)
-                                       Logger.error(this, "Took "+timeDiff+" 
ms to send packet "+msg+" to "+pn);
-                               else
-                                       Logger.minor(this, "Took "+timeDiff+" 
ms to send packet "+msg+" to "+pn);
-                               
-                       }
-               }
-       }
-
-       public void sendPacket(Message msg, PeerNode pn, long maxWaitTime) 
throws NotConnectedException, ThrottledPacketLagException {
-               ThrottledPacket p = queuePacket(msg, pn);
-               p.waitUntilSent(maxWaitTime);
-       }
-
-       private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws 
NotConnectedException {
-               if(!pn.isConnected())
-                       throw new NotConnectedException();
-               ThrottledPacket p = new ThrottledPacket(msg, pn);
-               synchronized(queuedPackets) {
-                       queuedPackets.addLast(p);
-                       queuedPackets.notifyAll();
-               }
-               return p;
-       }
-
-       public void run() {
-               while(true) {
-                       if(sendThrottledPacket()) {
-                               // Sent one
-                               // Sleep
-                               try {
-                                       if(sleepTime > 0)
-                                               Thread.sleep(sleepTime);
-                               } catch (InterruptedException e) {
-                                       // Huh?
-                               }
-                       } else {
-                               // Didn't send one
-                               // Wait for one
-                               synchronized(queuedPackets) {
-                                       while(queuedPackets.isEmpty())
-                                               try {
-                                                       
queuedPackets.wait(10*1000);
-                                               } catch (InterruptedException 
e) {
-                                                       // Never mind
-                                               }
-                               }
-                       }
-                       
-               }
-       }
-
-       private boolean sendThrottledPacket() {
-               while(true) {
-                       ThrottledPacket p;
-                       synchronized(queuedPackets) {
-                               if(queuedPackets.isEmpty()) return false;
-                               p = (ThrottledPacket) 
queuedPackets.removeFirst();
-                       }
-                       if(!p.pn.isConnected()) {
-                               synchronized(p) {
-                                       p.lostConn = true;
-                                       p.notifyAll();
-                               }
-                               continue;
-                       }
-                       try {
-                               p.pn.send(p.msg);
-                       } catch (NotConnectedException e) {
-                               synchronized(p) {
-                                       p.lostConn = true;
-                                       p.notifyAll();
-                               }
-                               continue;
-                       } catch (RuntimeException e) {
-                               synchronized(p) {
-                                       p.re = e;
-                                       p.notifyAll();
-                               }
-                               return true;
-                       } catch (Error e) {
-                               synchronized(p) {
-                                       p.err = e;
-                                       p.notifyAll();
-                               }
-                               return true;
-                       }
-                       synchronized(p) {
-                               p.sent = true;
-                               p.notifyAll();
-                       }
-                       return true;
-               }
-       }
-
-}

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-30 18:11:21 UTC (rev 
7639)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-30 21:27:36 UTC (rev 
7640)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 251;
+       public static final int buildNumber = 252;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 251;
+       public static final int lastGoodBuild = 252;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;


Reply via email to