Author: toad
Date: 2008-03-07 17:25:35 +0000 (Fri, 07 Mar 2008)
New Revision: 18412

Added:
   trunk/freenet/src/freenet/support/SerialExecutor.java
Modified:
   trunk/freenet/src/freenet/client/FetchContext.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
Log:
Serialize all the heavy datastore accesses due to splitfile scheduling.
Specifically this means that restarting all requests on startup will take 
longer, but trash the node less.

Modified: trunk/freenet/src/freenet/client/FetchContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchContext.java  2008-03-07 15:22:02 UTC 
(rev 18411)
+++ trunk/freenet/src/freenet/client/FetchContext.java  2008-03-07 17:25:35 UTC 
(rev 18412)
@@ -55,6 +55,7 @@
        public Set allowedMIMETypes;
        public final Ticker ticker;
        public final Executor executor;
+       public final Executor slowSerialExecutor;

        public FetchContext(long curMaxLength, 
                        long curMaxTempLength, int maxMetadataSize, int 
maxRecursionLevel, int maxArchiveRestarts, int maxArchiveLevels,
@@ -64,9 +65,11 @@
                        int maxDataBlocksPerSegment, int 
maxCheckBlocksPerSegment,
                        RandomSource random, ArchiveManager archiveManager, 
BucketFactory bucketFactory,
                        ClientEventProducer producer, boolean 
cacheLocalRequests, USKManager uskManager, 
-                       HealingQueue hq, boolean ignoreTooManyPathComponents, 
Ticker ticker, Executor executor) {
+                       HealingQueue hq, boolean ignoreTooManyPathComponents, 
Ticker ticker, Executor executor, 
+                       Executor slowSerialExecutor) {
                this.ticker = ticker;
                this.executor = executor;
+               this.slowSerialExecutor = slowSerialExecutor;
                this.maxOutputLength = curMaxLength;
                this.uskManager = uskManager;
                this.maxTempLength = curMaxTempLength;
@@ -100,6 +103,7 @@
                        this.eventProducer = new SimpleEventProducer();
                this.ticker = ctx.ticker;
                this.executor = ctx.executor;
+               this.slowSerialExecutor = ctx.slowSerialExecutor;
                this.uskManager = ctx.uskManager;
                this.ignoreTooManyPathComponents = 
ctx.ignoreTooManyPathComponents;
                this.blocks = ctx.blocks;

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2008-03-07 15:22:02 UTC (rev 18411)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2008-03-07 17:25:35 UTC (rev 18412)
@@ -24,6 +24,7 @@
 import freenet.node.NodeClientCore;
 import freenet.node.RequestScheduler;
 import freenet.node.RequestStarter;
+import freenet.support.Executor;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;
@@ -48,6 +49,7 @@
        private int curMaxMetadataLength;
        private final RandomSource random;
        private final HealingQueue healingQueue;
+       private final Executor slowSerialExecutor;
        /** See comments in Node */
        private final boolean cacheLocalRequests;
        private final boolean forceDontIgnoreTooManyPathComponents;
@@ -83,8 +85,9 @@
        static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 64;


-       public HighLevelSimpleClientImpl(NodeClientCore node, ArchiveManager 
mgr, BucketFactory bf, RandomSource r, boolean cacheLocalRequests, short 
priorityClass, boolean forceDontIgnoreTooManyPathComponents) {
+       public HighLevelSimpleClientImpl(NodeClientCore node, ArchiveManager 
mgr, BucketFactory bf, RandomSource r, boolean cacheLocalRequests, short 
priorityClass, boolean forceDontIgnoreTooManyPathComponents, Executor 
slowSerialExecutor) {
                this.core = node;
+               this.slowSerialExecutor = slowSerialExecutor;
                archiveManager = mgr;
                this.priorityClass = priorityClass;
                bucketFactory = bf;
@@ -197,7 +200,7 @@
                                MAX_SPLITFILE_BLOCKS_PER_SEGMENT, 
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
                                random, archiveManager, bucketFactory, 
globalEventProducer, 
                                cacheLocalRequests, core.uskManager, 
healingQueue, 
-                               forceDontIgnoreTooManyPathComponents ? false : 
core.ignoreTooManyPathComponents, core.getTicker(), core.getExecutor());
+                               forceDontIgnoreTooManyPathComponents ? false : 
core.ignoreTooManyPathComponents, core.getTicker(), core.getExecutor(), 
slowSerialExecutor);
        }

        public InsertContext getInsertContext(boolean forceNonPersistent) {

Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcher.java        
2008-03-07 15:22:02 UTC (rev 18411)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcher.java        
2008-03-07 17:25:35 UTC (rev 18412)
@@ -284,7 +284,7 @@
        }

        public void scheduleOffThread() {
-               fetchContext.executor.execute(new Runnable() {
+               fetchContext.slowSerialExecutor.execute(new Runnable() {
                        public void run() {
                                schedule();
                        }

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2008-03-07 15:22:02 UTC 
(rev 18411)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2008-03-07 17:25:35 UTC 
(rev 18412)
@@ -49,6 +49,7 @@
 import freenet.support.Base64;
 import freenet.support.Executor;
 import freenet.support.Logger;
+import freenet.support.SerialExecutor;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.BooleanCallback;
 import freenet.support.api.IntCallback;
@@ -105,6 +106,7 @@
        /** If true, requests are resumed lazily i.e. startup does not block 
waiting for them. */
        private boolean lazyResume;
        protected final Persister persister;
+       private final SerialExecutor clientSlowSerialExecutor;

        public static int maxBackgroundUSKFetchers;

@@ -123,6 +125,7 @@
                this.nodeStats = node.nodeStats;
                this.random = node.random;
                this.backgroundBlockEncoder = new BackgroundBlockEncoder();
+               clientSlowSerialExecutor = new 
SerialExecutor(NativeThread.LOW_PRIORITY);
                backgroundBlockEncoderThread = new 
NativeThread(backgroundBlockEncoder, "Background block encoder", 
NativeThread.MIN_PRIORITY, false);
                backgroundBlockEncoderThread.setDaemon(true);
                byte[] pwdBuf = new byte[16];
@@ -413,8 +416,9 @@
                        fcpServer.maybeStart();
                if(tmci != null)
                        tmci.start();
+               clientSlowSerialExecutor.start(node.executor, "Heavy client 
jobs runner");

-               node.executor.execute(new Runnable() {
+               clientSlowSerialExecutor.execute(new Runnable() {
                        public void run() {
                                System.out.println("Resuming persistent 
requests");
                                Logger.normal(this, "Resuming persistent 
requests");
@@ -990,7 +994,7 @@
        }

        public HighLevelSimpleClient makeClient(short prioClass, boolean 
forceDontIgnoreTooManyPathComponents) {
-               return new HighLevelSimpleClientImpl(this, archiveManager, 
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass, 
forceDontIgnoreTooManyPathComponents);
+               return new HighLevelSimpleClientImpl(this, archiveManager, 
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass, 
forceDontIgnoreTooManyPathComponents, clientSlowSerialExecutor);
        }

        public FCPServer getFCPServer() {

Added: trunk/freenet/src/freenet/support/SerialExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/SerialExecutor.java                       
        (rev 0)
+++ trunk/freenet/src/freenet/support/SerialExecutor.java       2008-03-07 
17:25:35 UTC (rev 18412)
@@ -0,0 +1,84 @@
+package freenet.support;
+
+import java.util.LinkedList;
+
+import freenet.node.PrioRunnable;
+import freenet.support.io.NativeThread;
+
+public class SerialExecutor implements Executor {
+
+       private final LinkedList jobs;
+       private final int priority;
+       private boolean waiting;
+       
+       private final Runnable runner = new PrioRunnable() {
+
+               public int getPriority() {
+                       return priority;
+               }
+
+               public void run() {
+                       while(true) {
+                               Runnable job;
+                               synchronized(jobs) {
+                                       if(jobs.isEmpty()) {
+                                               waiting = true;
+                                               try {
+                                                       jobs.wait();
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
+                                               }
+                                               continue;
+                                       } else {
+                                               job = (Runnable) 
jobs.removeFirst();
+                                               waiting = false;
+                                       }
+                               }
+                               try {
+                                       job.run();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t, t);
+                                       Logger.error(this, "While running 
"+job+" on "+this);
+                               }
+                       }
+               }
+               
+       };
+       
+       public SerialExecutor(int priority) {
+               jobs = new LinkedList();
+               this.priority = priority;
+       }
+       
+       public void start(Executor realExecutor, String name) {
+               realExecutor.execute(runner, name);
+       }
+       
+       public void execute(Runnable job, String jobName) {
+               synchronized(jobs) {
+                       jobs.addLast(job);
+               }
+       }
+
+       public void execute(Runnable job, String jobName, boolean fromTicker) {
+               synchronized(jobs) {
+                       jobs.addLast(job);
+               }
+       }
+
+       public int[] runningThreads() {
+               int[] running = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+               running[priority] = 1;
+               return running;
+       }
+
+       public int[] waitingThreads() {
+               int[] running = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+               synchronized(jobs) {
+                       if(waiting)
+                               running[priority] = 1;
+               }
+               return running;
+       }
+
+}


Reply via email to