Author: toad
Date: 2008-04-24 13:17:14 +0000 (Thu, 24 Apr 2008)
New Revision: 19541
Modified:
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/Node.java
Log:
FailureTable does disk I/O (datastore access) on the UdpSocketHandler thread.
THIS IS BAD. Use a separate SerialExecutor thread.
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-04-24 04:47:27 UTC
(rev 19540)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-04-24 13:17:14 UTC
(rev 19541)
@@ -21,6 +21,7 @@
import freenet.keys.SSKBlock;
import freenet.support.LRUHashtable;
import freenet.support.Logger;
+import freenet.support.SerialExecutor;
import freenet.support.io.NativeThread;
// FIXME it is ESSENTIAL that we delete the ULPR data on requestors etc once
we have found the key.
@@ -71,9 +72,14 @@
node.random.nextBytes(offerAuthenticatorKey);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ offerExecutor = new SerialExecutor(NativeThread.HIGH_PRIORITY);
node.ps.queueTimedJob(new FailureTableCleaner(),
CLEANUP_PERIOD);
}
+ public void start() {
+ offerExecutor.start(node.executor, "FailureTable offers
executor");
+ }
+
/**
* Called when we route to a node and it fails for some reason, but we
continue the request.
* Normally the timeout will be the time it took to route to that node
and wait for its
@@ -234,6 +240,10 @@
entry.offer();
}
+ /** Run onOffer() on a separate thread since it can block for disk I/O,
and we don't want to cause
+ * transfer timeouts etc because of slow disk. */
+ private final SerialExecutor offerExecutor;
+
/**
* Called when we get an offer for a key. If this is an SSK, we will
only accept it if we have previously asked for it.
* If it is a CHK, we will accept it if we want it.
@@ -241,12 +251,11 @@
* @param peer The node offering it.
* @param authenticator
*/
- void onOffer(Key key, PeerNode peer, byte[] authenticator) {
+ void onOffer(final Key key, final PeerNode peer, final byte[]
authenticator) {
if(!node.enableULPRDataPropagation) return;
if(logMINOR)
Logger.minor(this, "Offered key "+key+" by peer "+peer);
FailureTableEntry entry;
- long now = System.currentTimeMillis();
synchronized(this) {
entry = (FailureTableEntry) entriesByKey.get(key);
if(entry == null) {
@@ -254,11 +263,31 @@
return; // we haven't asked for it
}
}
+ offerExecutor.execute(new Runnable() {
+ public void run() {
+ innerOfferKey(key, peer, authenticator);
+ }
+ }, "onOffer()");
+ }
+
+ protected void innerOfferKey(Key key, PeerNode peer, byte[]
authenticator) {
//NB: node.hasKey() executes a datastore fetch
if(node.hasKey(key)) {
Logger.minor(this, "Already have key");
return;
- }
+ }
+
+ // Re-check after potentially long disk I/O.
+ FailureTableEntry entry;
+ long now = System.currentTimeMillis();
+ synchronized(this) {
+ entry = (FailureTableEntry) entriesByKey.get(key);
+ if(entry == null) {
+ if(logMINOR) Logger.minor(this, "We didn't ask
for the key");
+ return; // we haven't asked for it
+ }
+ }
+
/*
* Accept (subject to later checks) if we asked for it.
* Should we accept it if we were asked for it? This is
"bidirectional propagation".
@@ -347,7 +376,19 @@
* @param source The node that asked for the key.
* @throws NotConnectedException If the sender ceases to be connected.
*/
- public void sendOfferedKey(Key key, final boolean isSSK, boolean
needPubKey, final long uid, final PeerNode source) throws NotConnectedException
{
+ public void sendOfferedKey(final Key key, final boolean isSSK, final
boolean needPubKey, final long uid, final PeerNode source) throws
NotConnectedException {
+ this.offerExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ innerSendOfferedKey(key, isSSK,
needPubKey, uid, source);
+ } catch (NotConnectedException e) {
+ // Too bad.
+ }
+ }
+ }, "sendOfferedKey");
+ }
+
+ protected void innerSendOfferedKey(Key key, final boolean isSSK,
boolean needPubKey, final long uid, final PeerNode source) throws
NotConnectedException {
if(isSSK) {
SSKBlock block = node.fetch((NodeSSK)key, false);
if(block == null) {
@@ -423,7 +464,7 @@
}, "CHK offer sender");
}
}
-
+
public final OfferedKeysByteCounter senderCounter = new
OfferedKeysByteCounter();
class OfferedKeysByteCounter implements ByteCounter {
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2008-04-24 04:47:27 UTC (rev
19540)
+++ trunk/freenet/src/freenet/node/Node.java 2008-04-24 13:17:14 UTC (rev
19541)
@@ -1622,6 +1622,7 @@
peers.start(); // must be before usm
nodeStats.start();
uptime.start();
+ failureTable.start();
darknetCrypto.start(disableHangCheckers);
if(opennet != null)