Author: toad
Date: 2008-04-25 19:04:57 +0000 (Fri, 25 Apr 2008)
New Revision: 19560
Added:
trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
Modified:
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
Log:
sendThrottledMessage() now has the option to send synchronously (with a
timeout, and an exception)
Use this to make sure we've sent the packet and got the callback before calling
applyByteCounts().
Not as big a change as the last commit in terms of increasing the SSK request
average bytes...
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-04-25 18:44:50 UTC
(rev 19559)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-04-25 19:04:57 UTC
(rev 19560)
@@ -8,6 +8,7 @@
import freenet.io.xfer.PacketThrottle;
import freenet.io.xfer.WaitedTooLongException;
import freenet.node.OutgoingPacketMangler;
+import freenet.node.SyncSendWaitedTooLongException;
/**
* @author amphibian
@@ -35,8 +36,9 @@
/** Send a message to the node */
public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
- /** Send a throttled message to the node (may block for a long time). */
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout) throws NotConnectedException,
WaitedTooLongException;
+ /** Send a throttled message to the node (may block for a long time).
+ * @throws SyncSendWaitedTooLongException */
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean waitForSent) throws
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException;
/** Get the current boot ID. This is a random number that changes every
time the node starts up. */
public long getBootID();
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-04-25
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-04-25
19:04:57 UTC (rev 19560)
@@ -31,6 +31,7 @@
import freenet.io.comm.PeerContext;
import freenet.io.comm.RetrievalException;
import freenet.node.PrioRunnable;
+import freenet.node.SyncSendWaitedTooLongException;
import freenet.support.BitArray;
import freenet.support.Executor;
import freenet.support.Logger;
@@ -98,7 +99,7 @@
}
int totalPackets;
try {
-
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT);
+
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT,
false);
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
Logger.normal(this,
"Terminating send: "+e);
@@ -114,6 +115,10 @@
_sendComplete = true;
}
return;
+ } catch (SyncSendWaitedTooLongException
e) {
+ // Impossible
+ Logger.error(this, "Impossible:
Caught "+e, e);
+ return;
}
synchronized (_senderThread) {
_sentPackets.setBit(packetNo,
true);
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-04-25
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-04-25
19:04:57 UTC (rev 19560)
@@ -11,6 +11,7 @@
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
+import freenet.node.SyncSendWaitedTooLongException;
import freenet.support.BitArray;
import freenet.support.Logger;
@@ -244,7 +245,7 @@
// Congestion control and bandwidth limiting
try {
-
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
buf.length, ctr, BulkReceiver.TIMEOUT);
+
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
buf.length, ctr, BulkReceiver.TIMEOUT, false);
synchronized(this) {
blocksNotSentButPresent.setBit(blockNo,
false);
}
@@ -257,6 +258,10 @@
} catch (WaitedTooLongException e) {
Logger.error(this, "Failed to send bulk packet
"+blockNo+" for "+this);
return false;
+ } catch (SyncSendWaitedTooLongException e) {
+ // Impossible
+ Logger.error(this, "Impossible: Caught "+e, e);
+ return false;
}
}
}
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-04-25
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-04-25
19:04:57 UTC (rev 19560)
@@ -25,6 +25,7 @@
import freenet.io.comm.Peer;
import freenet.io.comm.PeerContext;
import freenet.node.PeerNode;
+import freenet.node.SyncSendWaitedTooLongException;
import freenet.support.DoubleTokenBucket;
import freenet.support.Logger;
@@ -146,7 +147,7 @@
return ((PACKET_SIZE * 1000.0 / getDelay()));
}
- public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long
deadline) throws NotConnectedException, ThrottleDeprecatedException,
WaitedTooLongException {
+ public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long
deadline, boolean blockForSend) throws NotConnectedException,
ThrottleDeprecatedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
long start = System.currentTimeMillis();
long bootID = peer.getBootID();
synchronized(this) {
@@ -238,6 +239,23 @@
Logger.minor(this, "Not throttling
"+peer.shortToString()+" for "+this);
peer.sendAsync(msg, callback, packetSize, ctr);
ctr.sentPayload(packetSize);
+ if(blockForSend) {
+ synchronized(callback) {
+ long timeout =
System.currentTimeMillis() + 60*1000;
+ long now;
+ while((now =
System.currentTimeMillis()) < timeout && !callback.finished) {
+ try {
+
callback.wait((int)(timeout - now));
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ }
+ if(!callback.finished) {
+ throw new
SyncSendWaitedTooLongException();
+ }
+ }
+ }
+
} catch (RuntimeException e) {
callback.fatalError();
throw e;
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-04-25 18:44:50 UTC
(rev 19559)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-04-25 19:04:57 UTC
(rev 19560)
@@ -421,12 +421,14 @@
public void run() {
try {
-
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000);
+
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false);
} catch (NotConnectedException e) {
// :(
} catch (WaitedTooLongException e) {
// :<
Logger.error(this, "Waited too
long sending SSK data");
+ } catch (SyncSendWaitedTooLongException
e) {
+ // Impossible
} finally {
node.unlockUID(uid, isSSK,
false, false, true, false);
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-04-25 18:44:50 UTC
(rev 19559)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-04-25 19:04:57 UTC
(rev 19560)
@@ -3874,12 +3874,12 @@
return resendBytesSent;
}
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout) throws NotConnectedException,
WaitedTooLongException {
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean blockForSend) throws
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException {
long deadline = System.currentTimeMillis() + timeout;
if(logMINOR) Logger.minor(this, "Sending throttled message with
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
for(int i=0;i<100;i++) {
try {
- getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr, deadline);
+ getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr, deadline, blockForSend);
return;
} catch (ThrottleDeprecatedException e) {
// Try with the new throttle. We don't need it,
we'll get it from getThrottle().
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-04-25 18:44:50 UTC
(rev 19559)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-04-25 19:04:57 UTC
(rev 19560)
@@ -350,13 +350,15 @@
public void run() {
try {
- source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60*1000);
+ source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60*1000, true);
applyByteCounts();
} catch (NotConnectedException e) {
// Okay
} catch (WaitedTooLongException e) {
// Grrrr
- Logger.error(this, "Waited too long to
send SSK data on "+RequestHandler.this);
+ Logger.error(this, "Waited too long to
send SSK data on "+RequestHandler.this+" because of bwlimiting");
+ } catch (SyncSendWaitedTooLongException e) {
+ Logger.error(this, "Waited too long to
send SSK data on "+RequestHandler.this+" because of peer");
} finally {
unregisterRequestHandlerWithNode();
}
@@ -382,7 +384,12 @@
Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid,
headers);
source.sendAsync(headersMsg, null, 0, ctr);
final Message dataMsg = DMT.createFNPSSKDataFoundData(uid,
data);
- source.sendThrottledMessage(dataMsg, data.length, ctr, 60*1000);
+ try {
+ source.sendThrottledMessage(dataMsg, data.length, ctr,
60*1000, false);
+ } catch (SyncSendWaitedTooLongException e) {
+ // Impossible
+ throw new Error(e);
+ }
if(SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid, headers,
data);
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-04-25
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-04-25
19:04:57 UTC (rev 19560)
@@ -184,7 +184,7 @@
if(logMINOR) Logger.minor(this, "Lost
connection to source on "+uid);
return;
} catch (WaitedTooLongException e1) {
- Logger.error(this, "Took too long to send ssk
datareply to "+uid);
+ Logger.error(this, "Took too long to send ssk
datareply to "+uid+" (because of throttling)");
return;
}
block = storedBlock;
@@ -247,7 +247,7 @@
if(logMINOR) Logger.minor(this, "Lost
connection to source on "+uid);
return;
} catch (WaitedTooLongException e1) {
- Logger.error(this, "Took too long to
send ssk datareply to "+uid);
+ Logger.error(this, "Took too long to
send ssk datareply to "+uid+" because of bwlimiting");
return;
}
}
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-04-25 18:44:50 UTC
(rev 19559)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-04-25 19:04:57 UTC
(rev 19560)
@@ -249,13 +249,15 @@
try {
next.sendAsync(headersMsg, null, 0, this);
- next.sendThrottledMessage(dataMsg, data.length,
this, SSKInsertHandler.DATA_INSERT_TIMEOUT);
+ next.sendThrottledMessage(dataMsg, data.length,
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected
to "+next);
continue;
} catch (WaitedTooLongException e) {
Logger.error(this, "Waited too long to send
"+dataMsg+" to "+next+" on "+this);
continue;
+ } catch (SyncSendWaitedTooLongException e) {
+ // Impossible
}
// Do we need to send them the pubkey?
Added: trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
(rev 0)
+++ trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
2008-04-25 19:04:57 UTC (rev 19560)
@@ -0,0 +1,11 @@
+package freenet.node;
+
+/**
+ * This exception is thrown when it we try to do a blocking send of a message,
and it takes too long
+ * so we timeout. Compare to WaitedTooLongException, which is thrown when we
are waiting for clearance
+ * from bandwidth limiting and don't get it within a timeout period.
+ * @author toad
+ */
+public class SyncSendWaitedTooLongException extends Exception {
+
+}