Author: toad
Date: 2005-11-19 00:43:09 +0000 (Sat, 19 Nov 2005)
New Revision: 7569

Added:
   trunk/freenet/src/freenet/node/ThrottledPacketSender.java
Modified:
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RealNodePingTest.java
   trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
   trunk/freenet/src/freenet/node/RealNodeRoutingTest.java
   trunk/freenet/src/freenet/node/Version.java
Log:
198:
Bandwidth limiting.
Defaults to 15 data packets/sec.
But that can be overridden with the third command line argument.

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-18 
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-19 
00:43:09 UTC (rev 7569)
@@ -105,7 +105,7 @@
                                                }
                                                _sentPackets.setBit(packetNo, 
true);
                                                try {
-                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null);
+                                                       
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)));
                                                // We accelerate the ping rate 
during the transfer to keep a closer eye on round-trip-time
                                                sentSinceLastPing++;
                                                if (sentSinceLastPing >= 
PING_EVERY) {

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-11-18 22:12:55 UTC (rev 
7568)
+++ trunk/freenet/src/freenet/node/Node.java    2005-11-19 00:43:09 UTC (rev 
7569)
@@ -260,8 +260,8 @@
      */
     public static void main(String[] args) throws IOException {
        int length = args.length;
-       if (length < 1 || length > 2) {
-               System.out.println("Usage: $ java freenet.node.Node 
<portNumber> [ipOverride]");
+       if (length < 1 || length > 3) {
+               System.out.println("Usage: $ java freenet.node.Node 
<portNumber> [ipOverride] [max data packets / second]");
                return;
        }

@@ -275,12 +275,16 @@
         Logger.normal(Node.class, "Creating node...");
         Yarrow yarrow = new Yarrow();
         InetAddress overrideIP = null;
+        int packetsPerSecond = 15;
         if(args.length > 1) {
             overrideIP = InetAddress.getByName(args[1]);
             System.err.println("Overriding IP detection: 
"+overrideIP.getHostAddress());
+            if(args.length > 2) {
+               packetsPerSecond = Integer.parseInt(args[2]);
+            }
         }
         DiffieHellman.init(yarrow);
-        Node n = new Node(port, yarrow, overrideIP, "");
+        Node n = new Node(port, yarrow, overrideIP, "", 1000 / 
packetsPerSecond);
         n.start(new StaticSwapRequestInterval(2000));
         new TextModeClientInterface(n);
         Thread t = new Thread(new MemoryChecker(), "Memory checker");
@@ -290,7 +294,7 @@

     // FIXME - the whole overrideIP thing is a hack to avoid config
     // Implement the config!
-    Node(int port, RandomSource rand, InetAddress overrideIP, String prefix) {
+    Node(int port, RandomSource rand, InetAddress overrideIP, String prefix, 
int throttleInterval) {
         portNumber = port;
         startupTime = System.currentTimeMillis();
         recentlyCompletedIDs = new LRUQueue();
@@ -318,6 +322,8 @@
         insertSenders = new HashMap();
         runningUIDs = new HashSet();

+        globalThrottle = new ThrottledPacketSender(throttleInterval);
+        
                lm = new LocationManager(random);

         try {
@@ -793,6 +799,8 @@
     }

     final LRUQueue recentlyCompletedIDs;
+
+       public final ThrottledPacketSender globalThrottle;
     static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;

     /**

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-11-18 22:12:55 UTC 
(rev 7568)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-11-19 00:43:09 UTC 
(rev 7569)
@@ -984,4 +984,8 @@
                return 1.0;
        }
        }
+
+       public void throttledSend(Message message) throws NotConnectedException 
{
+               node.globalThrottle.sendPacket(message, this);
+       }
 }

Modified: trunk/freenet/src/freenet/node/RealNodePingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodePingTest.java        2005-11-18 
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodePingTest.java        2005-11-19 
00:43:09 UTC (rev 7569)
@@ -24,8 +24,8 @@
         Yarrow yarrow = new Yarrow();
         DiffieHellman.init(yarrow);
         // Create 2 nodes
-        Node node1 = new Node(5001, yarrow, null, "pingtest-");
-        Node node2 = new Node(5002, yarrow, null, "pingtest-");
+        Node node1 = new Node(5001, yarrow, null, "pingtest-", 0);
+        Node node2 = new Node(5002, yarrow, null, "pingtest-", 0);
         SimpleFieldSet node1ref = node1.exportFieldSet();
         SimpleFieldSet node2ref = node2.exportFieldSet();
         // Connect

Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2005-11-18 22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2005-11-19 00:43:09 UTC (rev 7569)
@@ -41,7 +41,7 @@
         Node[] nodes = new Node[NUMBER_OF_NODES];
         Logger.normal(RealNodeRoutingTest.class, "Creating nodes...");
         for(int i=0;i<NUMBER_OF_NODES;i++) {
-            nodes[i] = new Node(5000+i, random, null, wd+File.separator);
+            nodes[i] = new Node(5000+i, random, null, wd+File.separator, 0);
             nodes[i].usm.setDropProbability(20); // 5%
             Logger.normal(RealNodeRoutingTest.class, "Created node "+i);
         }

Modified: trunk/freenet/src/freenet/node/RealNodeRoutingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRoutingTest.java     2005-11-18 
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodeRoutingTest.java     2005-11-19 
00:43:09 UTC (rev 7569)
@@ -35,7 +35,7 @@
         Node[] nodes = new Node[NUMBER_OF_NODES];
         Logger.normal(RealNodeRoutingTest.class, "Creating nodes...");
         for(int i=0;i<NUMBER_OF_NODES;i++) {
-            nodes[i] = new Node(5000+i, random, null, wd+File.separator);
+            nodes[i] = new Node(5000+i, random, null, wd+File.separator, 0);
             Logger.normal(RealNodeRoutingTest.class, "Created node "+i);
         }
         SimpleFieldSet refs[] = new SimpleFieldSet[NUMBER_OF_NODES];

Added: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-18 
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java   2005-11-19 
00:43:09 UTC (rev 7569)
@@ -0,0 +1,120 @@
+package freenet.node;
+
+import java.util.LinkedList;
+
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+
+/**
+ * Sends throttled packets.
+ * Only data packets are throttled, and they are all
+ * slightly over 1kB. So we can throttle bandwidth by
+ * just sending no more than N of these per second.
+ * Initially a rather brutal implementation; we send one
+ * packet, every 1/N seconds, or we don't. That's it!
+ */
+public class ThrottledPacketSender implements Runnable {
+
+       final LinkedList queuedPackets;
+       final int sleepTime;
+       
+       public ThrottledPacketSender(int sleepTime) {
+               this.sleepTime = sleepTime;
+               queuedPackets = new LinkedList();
+               Thread t = new Thread(this, "Throttled packet sender");
+               t.setDaemon(true);
+               t.start();
+       }
+       
+       public class ThrottledPacket {
+               public ThrottledPacket(Message msg2, PeerNode pn2) {
+                       this.msg = msg2;
+                       this.pn = pn2;
+                       sent = false;
+               }
+               
+               final Message msg;
+               final PeerNode pn;
+               boolean sent;
+               
+               public void waitUntilSent() {
+                       synchronized(this) {
+                               while(!sent) {
+                                       try {
+                                               wait(10*1000);
+                                       } catch (InterruptedException e) {
+                                               // Ignore
+                                       }
+                               }
+                       }
+               }
+       }
+
+       public void sendPacket(Message msg, PeerNode pn) throws 
NotConnectedException {
+               ThrottledPacket p = queuePacket(msg, pn);
+               p.waitUntilSent();
+       }
+
+       private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws 
NotConnectedException {
+               if(!pn.isConnected())
+                       throw new NotConnectedException();
+               ThrottledPacket p = new ThrottledPacket(msg, pn);
+               synchronized(queuedPackets) {
+                       queuedPackets.addLast(p);
+                       queuedPackets.notify();
+               }
+               return p;
+       }
+
+       public void run() {
+               while(true) {
+                       if(sendThrottledPacket()) {
+                               // Sent one
+                               // Sleep
+                               try {
+                                       if(sleepTime > 0)
+                                               Thread.sleep(sleepTime);
+                               } catch (InterruptedException e) {
+                                       // Huh?
+                               }
+                       } else {
+                               // Didn't send one
+                               // Wait for one
+                               synchronized(queuedPackets) {
+                                       while(queuedPackets.isEmpty())
+                                               try {
+                                                       
queuedPackets.wait(10*1000);
+                                               } catch (InterruptedException 
e) {
+                                                       // Never mind
+                                               }
+                               }
+                       }
+                       
+               }
+       }
+
+       private boolean sendThrottledPacket() {
+               while(true) {
+                       ThrottledPacket p;
+                       synchronized(queuedPackets) {
+                               if(queuedPackets.isEmpty()) return false;
+                               p = (ThrottledPacket) 
queuedPackets.removeFirst();
+                       }
+                       if(!p.pn.isConnected()) continue;
+                       try {
+                               p.pn.send(p.msg);
+                               synchronized(p) {
+                                       p.sent = true;
+                                       p.notifyAll();
+                               }
+                       } catch (NotConnectedException e) {
+                               continue;
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught "+t, t);
+                               continue;
+                       }
+               }
+       }
+
+}

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-18 22:12:55 UTC (rev 
7568)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-19 00:43:09 UTC (rev 
7569)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 197;
+       public static final int buildNumber = 198;

        /** Oldest build of Fred we will talk to */
        public static final int lastGoodBuild = 196;


Reply via email to