Author: toad
Date: 2005-11-26 00:55:28 +0000 (Sat, 26 Nov 2005)
New Revision: 7619
Added:
trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
Modified:
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/InsertSender.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/ThrottledPacketSender.java
trunk/freenet/src/freenet/node/Version.java
Log:
234: (mandatory)
Don't allow block transmitter packets to be queued for more than 30 seconds.
Cause a reject:overload if they do.
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-26
00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-26
00:55:28 UTC (rev 7619)
@@ -29,6 +29,7 @@
import freenet.io.comm.PeerContext;
import freenet.io.comm.UdpSocketManager;
import freenet.node.PeerNode;
+import freenet.node.ThrottledPacketLagException;
import freenet.support.BitArray;
import freenet.support.Logger;
@@ -48,6 +49,8 @@
LinkedList _unsent;
Thread _receiverThread, _senderThread;
BitArray _sentPackets;
+ boolean failedByOverload = false;
+ final PacketThrottle throttle;
public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source) {
_usm = usm;
@@ -60,15 +63,7 @@
Logger.error(this, "Aborted during setup");
// Will throw on running
}
- }
-
- public void sendAborted(int reason, String desc) throws
NotConnectedException {
- _usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc));
- }
-
- public boolean send() {
- final PacketThrottle throttle =
PacketThrottle.getThrottle(_destination.getPeer(), _prb._packetSize);
- _receiverThread = Thread.currentThread();
+ throttle = PacketThrottle.getThrottle(_destination.getPeer(),
_prb._packetSize);
_senderThread = new Thread("_senderThread for "+_uid) {
public void run() {
@@ -102,7 +97,7 @@
}
_sentPackets.setBit(packetNo,
true);
try {
-
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)));
+
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), SEND_TIMEOUT);
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
@@ -112,14 +107,35 @@
}
} catch (NotConnectedException
e) {
Logger.normal(this,
"Terminating send: "+e);
- _sendComplete = true;
+ synchronized(_senderThread)
{
+ _sendComplete = true;
+
_senderThread.notifyAll();
+ }
} catch (AbortedException e) {
Logger.normal(this,
"Terminating send due to abort: "+e);
- _sendComplete = true;
+
synchronized(_senderThread) {
+ _sendComplete =
true;
+
_senderThread.notifyAll();
+ }
+ } catch
(ThrottledPacketLagException e) {
+ Logger.error(this,
"Terminating send due to overload: "+e);
+
synchronized(_senderThread) {
+
failedByOverload = true;
+ _sendComplete =
true;
+
_senderThread.notifyAll();
+ }
}
}
}
};
+ }
+
+ public void sendAborted(int reason, String desc) throws
NotConnectedException {
+ _usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc));
+ }
+
+ public boolean send() {
+ _receiverThread = Thread.currentThread();
try {
_unsent = _prb.addListener(new
PartiallyReceivedBlock.PacketReceivedListener() {;
@@ -226,4 +242,20 @@
t.setDaemon(true);
t.start();
}
+
+ public void waitForComplete() {
+ synchronized(_senderThread) {
+ while(!_sendComplete) {
+ try {
+ _senderThread.wait(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ public boolean failedDueToOverload() {
+ return failedByOverload;
+ }
}
Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2005-11-26 00:26:12 UTC
(rev 7618)
+++ trunk/freenet/src/freenet/node/InsertSender.java 2005-11-26 00:55:28 UTC
(rev 7619)
@@ -38,6 +38,7 @@
this.closestLocation = closestLocation;
this.startTime = System.currentTimeMillis();
senderThreads = new LinkedList();
+ blockSenders = new LinkedList();
Thread t = new Thread(this, "InsertSender for UID "+uid+" on
"+node.portNumber+" at "+System.currentTimeMillis());
t.setDaemon(true);
t.start();
@@ -62,6 +63,7 @@
final long startTime;
private BlockTransmitter bt;
private final LinkedList senderThreads;
+ private final LinkedList blockSenders;
private int status = -1;
static final int NOT_FINISHED = -1;
@@ -195,6 +197,7 @@
senderThread.setDaemon(true);
senderThread.start();
senderThreads.add(senderThread);
+ blockSenders.add(bt);
if(receiveFailed) return;
try {
@@ -312,6 +315,15 @@
Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
if(status != NOT_FINISHED)
throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
+
+ for(Iterator i = blockSenders.iterator();i.hasNext();) {
+ BlockTransmitter bt = (BlockTransmitter) i.next();
+ bt.waitForComplete();
+ if(bt.failedDueToOverload() && (status == SUCCESS || status ==
ROUTE_NOT_FOUND)) {
+ status = REJECTED_OVERLOAD;
+ break;
+ }
+ }
for(Iterator i = senderThreads.iterator();i.hasNext();) {
Thread senderThread = (Thread) i.next();
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2005-11-26 00:26:12 UTC
(rev 7618)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2005-11-26 00:55:28 UTC
(rev 7619)
@@ -1027,7 +1027,7 @@
// }
}
- public void throttledSend(Message message) throws NotConnectedException
{
- node.globalThrottle.sendPacket(message, this);
+ public void throttledSend(Message message, long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
+ node.globalThrottle.sendPacket(message, this, maxWaitTime);
}
}
Added: trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
2005-11-26 00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
2005-11-26 00:55:28 UTC (rev 7619)
@@ -0,0 +1,10 @@
+package freenet.node;
+
+/**
+ * Thrown when a throttled send is queued for too long.
+ * @author root
+ *
+ */
+public class ThrottledPacketLagException extends Exception {
+
+}
Modified: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-26
00:26:12 UTC (rev 7618)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-26
00:55:28 UTC (rev 7619)
@@ -43,11 +43,18 @@
RuntimeException re;
Error err;
- public void waitUntilSent() throws NotConnectedException {
+ public void waitUntilSent(long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
+ long startTime = System.currentTimeMillis();
+ long waitEndTime = startTime + maxWaitTime;
synchronized(this) {
while(!(sent || lostConn || re != null || err
!= null)) {
try {
- wait(10*1000);
+ long wait = waitEndTime -
System.currentTimeMillis();
+ if(wait > 0)
+ wait(10*1000);
+ if(wait <= 0) {
+ throw new
ThrottledPacketLagException();
+ }
} catch (InterruptedException e) {
// Ignore
}
@@ -65,9 +72,9 @@
}
}
- public void sendPacket(Message msg, PeerNode pn) throws
NotConnectedException {
+ public void sendPacket(Message msg, PeerNode pn, long maxWaitTime)
throws NotConnectedException, ThrottledPacketLagException {
ThrottledPacket p = queuePacket(msg, pn);
- p.waitUntilSent();
+ p.waitUntilSent(maxWaitTime);
}
private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws
NotConnectedException {
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-26 00:26:12 UTC (rev
7618)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-26 00:55:28 UTC (rev
7619)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 233;
+ public static final int buildNumber = 234;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 232;
+ public static final int lastGoodBuild = 234;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;