Author: toad
Date: 2008-04-24 13:17:14 +0000 (Thu, 24 Apr 2008)
New Revision: 19541

Modified:
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/Node.java
Log:
FailureTable does disk I/O (datastore access) on the UdpSocketHandler thread.
THIS IS BAD. Use a separate SerialExecutor thread.

Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2008-04-24 04:47:27 UTC 
(rev 19540)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2008-04-24 13:17:14 UTC 
(rev 19541)
@@ -21,6 +21,7 @@
 import freenet.keys.SSKBlock;
 import freenet.support.LRUHashtable;
 import freenet.support.Logger;
+import freenet.support.SerialExecutor;
 import freenet.support.io.NativeThread;

 // FIXME it is ESSENTIAL that we delete the ULPR data on requestors etc once 
we have found the key.
@@ -71,9 +72,14 @@
                node.random.nextBytes(offerAuthenticatorKey);
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+               offerExecutor = new SerialExecutor(NativeThread.HIGH_PRIORITY);
                node.ps.queueTimedJob(new FailureTableCleaner(), 
CLEANUP_PERIOD);
        }

+       public void start() {
+               offerExecutor.start(node.executor, "FailureTable offers 
executor");
+       }
+       
        /**
         * Called when we route to a node and it fails for some reason, but we 
continue the request.
         * Normally the timeout will be the time it took to route to that node 
and wait for its 
@@ -234,6 +240,10 @@
                entry.offer();
        }

+       /** Run onOffer() on a separate thread since it can block for disk I/O, 
and we don't want to cause 
+        * transfer timeouts etc because of slow disk. */
+       private final SerialExecutor offerExecutor;
+       
        /**
         * Called when we get an offer for a key. If this is an SSK, we will 
only accept it if we have previously asked for it.
         * If it is a CHK, we will accept it if we want it.
@@ -241,12 +251,11 @@
         * @param peer The node offering it.
         * @param authenticator 
         */
-       void onOffer(Key key, PeerNode peer, byte[] authenticator) {
+       void onOffer(final Key key, final PeerNode peer, final byte[] 
authenticator) {
                if(!node.enableULPRDataPropagation) return;
                if(logMINOR)
                        Logger.minor(this, "Offered key "+key+" by peer "+peer);
                FailureTableEntry entry;
-               long now = System.currentTimeMillis();
                synchronized(this) {
                        entry = (FailureTableEntry) entriesByKey.get(key);
                        if(entry == null) {
@@ -254,11 +263,31 @@
                                return; // we haven't asked for it
                        }
                }
+               offerExecutor.execute(new Runnable() {
+                       public void run() {
+                               innerOfferKey(key, peer, authenticator);
+                       }
+               }, "onOffer()");
+       }
+
+       protected void innerOfferKey(Key key, PeerNode peer, byte[] 
authenticator) {
                //NB: node.hasKey() executes a datastore fetch
                if(node.hasKey(key)) {
                        Logger.minor(this, "Already have key");
                        return;
-               }               
+               }
+               
+               // Re-check after potentially long disk I/O.
+               FailureTableEntry entry;
+               long now = System.currentTimeMillis();
+               synchronized(this) {
+                       entry = (FailureTableEntry) entriesByKey.get(key);
+                       if(entry == null) {
+                               if(logMINOR) Logger.minor(this, "We didn't ask 
for the key");
+                               return; // we haven't asked for it
+                       }
+               }
+
                /*
                 * Accept (subject to later checks) if we asked for it.
                 * Should we accept it if we were asked for it? This is 
"bidirectional propagation".
@@ -347,7 +376,19 @@
         * @param source The node that asked for the key.
         * @throws NotConnectedException If the sender ceases to be connected.
         */
-       public void sendOfferedKey(Key key, final boolean isSSK, boolean 
needPubKey, final long uid, final PeerNode source) throws NotConnectedException 
{
+       public void sendOfferedKey(final Key key, final boolean isSSK, final 
boolean needPubKey, final long uid, final PeerNode source) throws 
NotConnectedException {
+               this.offerExecutor.execute(new Runnable() {
+                       public void run() {
+                               try {
+                                       innerSendOfferedKey(key, isSSK, 
needPubKey, uid, source);
+                               } catch (NotConnectedException e) {
+                                       // Too bad.
+                               }
+                       }
+               }, "sendOfferedKey");
+       }
+       
+       protected void innerSendOfferedKey(Key key, final boolean isSSK, 
boolean needPubKey, final long uid, final PeerNode source) throws 
NotConnectedException {
                if(isSSK) {
                        SSKBlock block = node.fetch((NodeSSK)key, false);
                        if(block == null) {
@@ -423,7 +464,7 @@
                }, "CHK offer sender");
                }
        }
-       
+
        public final OfferedKeysByteCounter senderCounter = new 
OfferedKeysByteCounter();

        class OfferedKeysByteCounter implements ByteCounter {

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2008-04-24 04:47:27 UTC (rev 
19540)
+++ trunk/freenet/src/freenet/node/Node.java    2008-04-24 13:17:14 UTC (rev 
19541)
@@ -1622,6 +1622,7 @@
                peers.start(); // must be before usm
                nodeStats.start();
                uptime.start();
+               failureTable.start();

                darknetCrypto.start(disableHangCheckers);
                if(opennet != null)


Reply via email to