Author: toad
Date: 2008-03-07 15:18:04 +0000 (Fri, 07 Mar 2008)
New Revision: 18410
Added:
trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
Modified:
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/comm/RetrievalException.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Add a timeout for sendThrottledPacket(). Should also fix leaking
BlockTransmitters on disconnection.
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-03-07 13:29:54 UTC
(rev 18409)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-03-07 15:18:04 UTC
(rev 18410)
@@ -6,6 +6,7 @@
import java.lang.ref.WeakReference;
import freenet.io.xfer.PacketThrottle;
+import freenet.io.xfer.WaitedTooLongException;
import freenet.node.OutgoingPacketMangler;
/**
@@ -35,7 +36,7 @@
public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
/** Send a throttled message to the node (may block for a long time). */
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr) throws NotConnectedException;
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout) throws NotConnectedException,
WaitedTooLongException;
/** Get the current boot ID. This is a random number that changes every
time the node starts up. */
public long getBootID();
Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java 2008-03-07
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java 2008-03-07
15:18:04 UTC (rev 18410)
@@ -39,6 +39,7 @@
public static final int NO_DATAINSERT = 8;
public static final int CANCELLED_BY_RECEIVER = 9;
public static final int RECEIVER_DIED = 11;
+ public static final int UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT = 12;
int _reason;
String _cause;
@@ -83,6 +84,8 @@
return "CANCELLED_BY_RECEIVER";
case UNKNOWN:
return "UNKNOWN";
+ case UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT:
+ return "UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT";
default:
return "UNKNOWN ("+reason+")";
}
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-03-07
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-03-07
15:18:04 UTC (rev 18410)
@@ -97,7 +97,7 @@
}
int totalPackets;
try {
-
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr);
+
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT);
if(_ctr != null)
_ctr.sentPayload(_prb._packetSize);
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
@@ -108,6 +108,12 @@
Logger.normal(this,
"Terminating send due to abort: "+e);
//the send() thread should
notice...
return;
+ } catch (WaitedTooLongException e) {
+ Logger.normal(this, "Waited too
long to send packet, aborting");
+ synchronized(_senderThread) {
+ _sendComplete = true;
+ }
+ return;
}
synchronized (_senderThread) {
_sentPackets.setBit(packetNo,
true);
@@ -164,6 +170,9 @@
executor.execute(_senderThread, toString());
while (true) {
+ synchronized(_senderThread) {
+ if(_sendComplete) return false;
+ }
Message msg;
boolean logMINOR =
Logger.shouldLog(Logger.MINOR, this);
try {
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-07
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-07
15:18:04 UTC (rev 18410)
@@ -243,7 +243,7 @@
// Congestion control and bandwidth limiting
try {
-
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
prb.blockSize, ctr);
+
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
prb.blockSize, ctr, BulkReceiver.TIMEOUT);
if(ctr != null) ctr.sentPayload(prb.blockSize);
synchronized(this) {
blocksNotSentButPresent.setBit(blockNo,
false);
@@ -254,6 +254,9 @@
if(logMINOR)
Logger.minor(this, "Canclled: not
connected "+this);
return false;
+ } catch (WaitedTooLongException e) {
+ Logger.error(this, "Failed to send bulk packet
"+blockNo+" for "+this);
+ return false;
}
}
}
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-03-07
13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-03-07
15:18:04 UTC (rev 18410)
@@ -159,7 +159,7 @@
return ((PACKET_SIZE * 1000.0 / getDelay()));
}
- public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr) throws
NotConnectedException, ThrottleDeprecatedException {
+ public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long
deadline) throws NotConnectedException, ThrottleDeprecatedException,
WaitedTooLongException {
long start = System.currentTimeMillis();
long bootID = peer.getBootID();
synchronized(this) {
@@ -177,8 +177,22 @@
break;
}
if(logMINOR) Logger.minor(this, "Window size:
"+windowSize+" packets in flight "+_packetsInFlight+" for "+this);
+ long now = System.currentTimeMillis();
+ int waitFor = (int)Math.min(Integer.MAX_VALUE,
deadline - now);
+ if(waitFor <= 0) {
+ // Double-check.
+ if(!peer.isConnected()) {
+ Logger.error(this, "Not
notified of disconnection before timeout");
+ throw new
NotConnectedException();
+ }
+ if(bootID != peer.getBootID()) {
+ Logger.error(this, "Not
notified of reconnection before timeout");
+ throw new
NotConnectedException();
+ }
+ Logger.error(this, "Unable to send
throttled message, waited "+(now-start)+"ms");
+ }
try {
- wait();
+ wait(waitFor);
} catch (InterruptedException e) {
// Ignore
}
Modified: trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
2008-03-07 13:29:54 UTC (rev 18409)
+++ trunk/freenet/src/freenet/io/xfer/ThrottleDeprecatedException.java
2008-03-07 15:18:04 UTC (rev 18410)
@@ -1,3 +1,6 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
package freenet.io.xfer;
/**
Added: trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
(rev 0)
+++ trunk/freenet/src/freenet/io/xfer/WaitedTooLongException.java
2008-03-07 15:18:04 UTC (rev 18410)
@@ -0,0 +1,12 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.xfer;
+
+/**
+ * Thrown when we wait too long to send a throttled packet.
+ * @author toad
+ */
+public class WaitedTooLongException extends Exception {
+
+}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-03-07 13:29:54 UTC
(rev 18409)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-03-07 15:18:04 UTC
(rev 18410)
@@ -53,6 +53,7 @@
import freenet.io.comm.SocketHandler;
import freenet.io.xfer.PacketThrottle;
import freenet.io.xfer.ThrottleDeprecatedException;
+import freenet.io.xfer.WaitedTooLongException;
import freenet.keys.ClientSSK;
import freenet.keys.FreenetURI;
import freenet.keys.Key;
@@ -3644,10 +3645,11 @@
return resendBytesSent;
}
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr) throws NotConnectedException {
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout) throws NotConnectedException,
WaitedTooLongException {
+ long deadline = System.currentTimeMillis() + timeout;
for(int i=0;i<100;i++) {
try {
- getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr);
+ getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr, deadline);
return;
} catch (ThrottleDeprecatedException e) {
// Try with the new throttle. We don't need it,
we'll get it from getThrottle().