Author: toad
Date: 2005-11-19 00:43:09 +0000 (Sat, 19 Nov 2005)
New Revision: 7569
Added:
trunk/freenet/src/freenet/node/ThrottledPacketSender.java
Modified:
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RealNodePingTest.java
trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
trunk/freenet/src/freenet/node/RealNodeRoutingTest.java
trunk/freenet/src/freenet/node/Version.java
Log:
198:
Bandwidth limiting.
Defaults to 15 data packets/sec.
But that can be overridden with the third command line argument.
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-18
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-19
00:43:09 UTC (rev 7569)
@@ -105,7 +105,7 @@
}
_sentPackets.setBit(packetNo,
true);
try {
-
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null);
+
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)));
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-11-18 22:12:55 UTC (rev
7568)
+++ trunk/freenet/src/freenet/node/Node.java 2005-11-19 00:43:09 UTC (rev
7569)
@@ -260,8 +260,8 @@
*/
public static void main(String[] args) throws IOException {
int length = args.length;
- if (length < 1 || length > 2) {
- System.out.println("Usage: $ java freenet.node.Node
<portNumber> [ipOverride]");
+ if (length < 1 || length > 3) {
+ System.out.println("Usage: $ java freenet.node.Node
<portNumber> [ipOverride] [max data packets / second]");
return;
}
@@ -275,12 +275,16 @@
Logger.normal(Node.class, "Creating node...");
Yarrow yarrow = new Yarrow();
InetAddress overrideIP = null;
+ int packetsPerSecond = 15;
if(args.length > 1) {
overrideIP = InetAddress.getByName(args[1]);
System.err.println("Overriding IP detection:
"+overrideIP.getHostAddress());
+ if(args.length > 2) {
+ packetsPerSecond = Integer.parseInt(args[2]);
+ }
}
DiffieHellman.init(yarrow);
- Node n = new Node(port, yarrow, overrideIP, "");
+ Node n = new Node(port, yarrow, overrideIP, "", 1000 /
packetsPerSecond);
n.start(new StaticSwapRequestInterval(2000));
new TextModeClientInterface(n);
Thread t = new Thread(new MemoryChecker(), "Memory checker");
@@ -290,7 +294,7 @@
// FIXME - the whole overrideIP thing is a hack to avoid config
// Implement the config!
- Node(int port, RandomSource rand, InetAddress overrideIP, String prefix) {
+ Node(int port, RandomSource rand, InetAddress overrideIP, String prefix,
int throttleInterval) {
portNumber = port;
startupTime = System.currentTimeMillis();
recentlyCompletedIDs = new LRUQueue();
@@ -318,6 +322,8 @@
insertSenders = new HashMap();
runningUIDs = new HashSet();
+ globalThrottle = new ThrottledPacketSender(throttleInterval);
+
lm = new LocationManager(random);
try {
@@ -793,6 +799,8 @@
}
final LRUQueue recentlyCompletedIDs;
+
+ public final ThrottledPacketSender globalThrottle;
static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;
/**
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2005-11-18 22:12:55 UTC
(rev 7568)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2005-11-19 00:43:09 UTC
(rev 7569)
@@ -984,4 +984,8 @@
return 1.0;
}
}
+
+ public void throttledSend(Message message) throws NotConnectedException
{
+ node.globalThrottle.sendPacket(message, this);
+ }
}
Modified: trunk/freenet/src/freenet/node/RealNodePingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodePingTest.java 2005-11-18
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodePingTest.java 2005-11-19
00:43:09 UTC (rev 7569)
@@ -24,8 +24,8 @@
Yarrow yarrow = new Yarrow();
DiffieHellman.init(yarrow);
// Create 2 nodes
- Node node1 = new Node(5001, yarrow, null, "pingtest-");
- Node node2 = new Node(5002, yarrow, null, "pingtest-");
+ Node node1 = new Node(5001, yarrow, null, "pingtest-", 0);
+ Node node2 = new Node(5002, yarrow, null, "pingtest-", 0);
SimpleFieldSet node1ref = node1.exportFieldSet();
SimpleFieldSet node2ref = node2.exportFieldSet();
// Connect
Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2005-11-18 22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2005-11-19 00:43:09 UTC (rev 7569)
@@ -41,7 +41,7 @@
Node[] nodes = new Node[NUMBER_OF_NODES];
Logger.normal(RealNodeRoutingTest.class, "Creating nodes...");
for(int i=0;i<NUMBER_OF_NODES;i++) {
- nodes[i] = new Node(5000+i, random, null, wd+File.separator);
+ nodes[i] = new Node(5000+i, random, null, wd+File.separator, 0);
nodes[i].usm.setDropProbability(20); // 5%
Logger.normal(RealNodeRoutingTest.class, "Created node "+i);
}
Modified: trunk/freenet/src/freenet/node/RealNodeRoutingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRoutingTest.java 2005-11-18
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/RealNodeRoutingTest.java 2005-11-19
00:43:09 UTC (rev 7569)
@@ -35,7 +35,7 @@
Node[] nodes = new Node[NUMBER_OF_NODES];
Logger.normal(RealNodeRoutingTest.class, "Creating nodes...");
for(int i=0;i<NUMBER_OF_NODES;i++) {
- nodes[i] = new Node(5000+i, random, null, wd+File.separator);
+ nodes[i] = new Node(5000+i, random, null, wd+File.separator, 0);
Logger.normal(RealNodeRoutingTest.class, "Created node "+i);
}
SimpleFieldSet refs[] = new SimpleFieldSet[NUMBER_OF_NODES];
Added: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-18
22:12:55 UTC (rev 7568)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-19
00:43:09 UTC (rev 7569)
@@ -0,0 +1,120 @@
+package freenet.node;
+
+import java.util.LinkedList;
+
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+
+/**
+ * Sends throttled packets.
+ * Only data packets are throttled, and they are all
+ * slightly over 1kB. So we can throttle bandwidth by
+ * just sending no more than N of these per second.
+ * Initially a rather brutal implementation; we send one
+ * packet, every 1/N seconds, or we don't. That's it!
+ */
+public class ThrottledPacketSender implements Runnable {
+
+ final LinkedList queuedPackets;
+ final int sleepTime;
+
+ public ThrottledPacketSender(int sleepTime) {
+ this.sleepTime = sleepTime;
+ queuedPackets = new LinkedList();
+ Thread t = new Thread(this, "Throttled packet sender");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ public class ThrottledPacket {
+ public ThrottledPacket(Message msg2, PeerNode pn2) {
+ this.msg = msg2;
+ this.pn = pn2;
+ sent = false;
+ }
+
+ final Message msg;
+ final PeerNode pn;
+ boolean sent;
+
+ public void waitUntilSent() {
+ synchronized(this) {
+ while(!sent) {
+ try {
+ wait(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+ }
+
+ public void sendPacket(Message msg, PeerNode pn) throws
NotConnectedException {
+ ThrottledPacket p = queuePacket(msg, pn);
+ p.waitUntilSent();
+ }
+
+ private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws
NotConnectedException {
+ if(!pn.isConnected())
+ throw new NotConnectedException();
+ ThrottledPacket p = new ThrottledPacket(msg, pn);
+ synchronized(queuedPackets) {
+ queuedPackets.addLast(p);
+ queuedPackets.notify();
+ }
+ return p;
+ }
+
+ public void run() {
+ while(true) {
+ if(sendThrottledPacket()) {
+ // Sent one
+ // Sleep
+ try {
+ if(sleepTime > 0)
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // Huh?
+ }
+ } else {
+ // Didn't send one
+ // Wait for one
+ synchronized(queuedPackets) {
+ while(queuedPackets.isEmpty())
+ try {
+
queuedPackets.wait(10*1000);
+ } catch (InterruptedException
e) {
+ // Never mind
+ }
+ }
+ }
+
+ }
+ }
+
+ private boolean sendThrottledPacket() {
+ while(true) {
+ ThrottledPacket p;
+ synchronized(queuedPackets) {
+ if(queuedPackets.isEmpty()) return false;
+ p = (ThrottledPacket)
queuedPackets.removeFirst();
+ }
+ if(!p.pn.isConnected()) continue;
+ try {
+ p.pn.send(p.msg);
+ synchronized(p) {
+ p.sent = true;
+ p.notifyAll();
+ }
+ } catch (NotConnectedException e) {
+ continue;
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ continue;
+ }
+ }
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-18 22:12:55 UTC (rev
7568)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-19 00:43:09 UTC (rev
7569)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 197;
+ public static final int buildNumber = 198;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 196;