Author: toad
Date: 2008-03-07 17:25:35 +0000 (Fri, 07 Mar 2008)
New Revision: 18412
Added:
trunk/freenet/src/freenet/support/SerialExecutor.java
Modified:
trunk/freenet/src/freenet/client/FetchContext.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
trunk/freenet/src/freenet/node/NodeClientCore.java
Log:
Serialize all the heavy datastore accesses due to splitfile scheduling.
Specifically this means that restarting all requests on startup will take
longer, but trash the node less.
Modified: trunk/freenet/src/freenet/client/FetchContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchContext.java 2008-03-07 15:22:02 UTC
(rev 18411)
+++ trunk/freenet/src/freenet/client/FetchContext.java 2008-03-07 17:25:35 UTC
(rev 18412)
@@ -55,6 +55,7 @@
public Set allowedMIMETypes;
public final Ticker ticker;
public final Executor executor;
+ public final Executor slowSerialExecutor;
public FetchContext(long curMaxLength,
long curMaxTempLength, int maxMetadataSize, int
maxRecursionLevel, int maxArchiveRestarts, int maxArchiveLevels,
@@ -64,9 +65,11 @@
int maxDataBlocksPerSegment, int
maxCheckBlocksPerSegment,
RandomSource random, ArchiveManager archiveManager,
BucketFactory bucketFactory,
ClientEventProducer producer, boolean
cacheLocalRequests, USKManager uskManager,
- HealingQueue hq, boolean ignoreTooManyPathComponents,
Ticker ticker, Executor executor) {
+ HealingQueue hq, boolean ignoreTooManyPathComponents,
Ticker ticker, Executor executor,
+ Executor slowSerialExecutor) {
this.ticker = ticker;
this.executor = executor;
+ this.slowSerialExecutor = slowSerialExecutor;
this.maxOutputLength = curMaxLength;
this.uskManager = uskManager;
this.maxTempLength = curMaxTempLength;
@@ -100,6 +103,7 @@
this.eventProducer = new SimpleEventProducer();
this.ticker = ctx.ticker;
this.executor = ctx.executor;
+ this.slowSerialExecutor = ctx.slowSerialExecutor;
this.uskManager = ctx.uskManager;
this.ignoreTooManyPathComponents =
ctx.ignoreTooManyPathComponents;
this.blocks = ctx.blocks;
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2008-03-07 15:22:02 UTC (rev 18411)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2008-03-07 17:25:35 UTC (rev 18412)
@@ -24,6 +24,7 @@
import freenet.node.NodeClientCore;
import freenet.node.RequestScheduler;
import freenet.node.RequestStarter;
+import freenet.support.Executor;
import freenet.support.Logger;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
@@ -48,6 +49,7 @@
private int curMaxMetadataLength;
private final RandomSource random;
private final HealingQueue healingQueue;
+ private final Executor slowSerialExecutor;
/** See comments in Node */
private final boolean cacheLocalRequests;
private final boolean forceDontIgnoreTooManyPathComponents;
@@ -83,8 +85,9 @@
static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 64;
- public HighLevelSimpleClientImpl(NodeClientCore node, ArchiveManager
mgr, BucketFactory bf, RandomSource r, boolean cacheLocalRequests, short
priorityClass, boolean forceDontIgnoreTooManyPathComponents) {
+ public HighLevelSimpleClientImpl(NodeClientCore node, ArchiveManager
mgr, BucketFactory bf, RandomSource r, boolean cacheLocalRequests, short
priorityClass, boolean forceDontIgnoreTooManyPathComponents, Executor
slowSerialExecutor) {
this.core = node;
+ this.slowSerialExecutor = slowSerialExecutor;
archiveManager = mgr;
this.priorityClass = priorityClass;
bucketFactory = bf;
@@ -197,7 +200,7 @@
MAX_SPLITFILE_BLOCKS_PER_SEGMENT,
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
random, archiveManager, bucketFactory,
globalEventProducer,
cacheLocalRequests, core.uskManager,
healingQueue,
- forceDontIgnoreTooManyPathComponents ? false :
core.ignoreTooManyPathComponents, core.getTicker(), core.getExecutor());
+ forceDontIgnoreTooManyPathComponents ? false :
core.ignoreTooManyPathComponents, core.getTicker(), core.getExecutor(),
slowSerialExecutor);
}
public InsertContext getInsertContext(boolean forceNonPersistent) {
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-03-07 15:22:02 UTC (rev 18411)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-03-07 17:25:35 UTC (rev 18412)
@@ -284,7 +284,7 @@
}
public void scheduleOffThread() {
- fetchContext.executor.execute(new Runnable() {
+ fetchContext.slowSerialExecutor.execute(new Runnable() {
public void run() {
schedule();
}
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2008-03-07 15:22:02 UTC
(rev 18411)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2008-03-07 17:25:35 UTC
(rev 18412)
@@ -49,6 +49,7 @@
import freenet.support.Base64;
import freenet.support.Executor;
import freenet.support.Logger;
+import freenet.support.SerialExecutor;
import freenet.support.SimpleFieldSet;
import freenet.support.api.BooleanCallback;
import freenet.support.api.IntCallback;
@@ -105,6 +106,7 @@
/** If true, requests are resumed lazily i.e. startup does not block
waiting for them. */
private boolean lazyResume;
protected final Persister persister;
+ private final SerialExecutor clientSlowSerialExecutor;
public static int maxBackgroundUSKFetchers;
@@ -123,6 +125,7 @@
this.nodeStats = node.nodeStats;
this.random = node.random;
this.backgroundBlockEncoder = new BackgroundBlockEncoder();
+ clientSlowSerialExecutor = new
SerialExecutor(NativeThread.LOW_PRIORITY);
backgroundBlockEncoderThread = new
NativeThread(backgroundBlockEncoder, "Background block encoder",
NativeThread.MIN_PRIORITY, false);
backgroundBlockEncoderThread.setDaemon(true);
byte[] pwdBuf = new byte[16];
@@ -413,8 +416,9 @@
fcpServer.maybeStart();
if(tmci != null)
tmci.start();
+ clientSlowSerialExecutor.start(node.executor, "Heavy client
jobs runner");
- node.executor.execute(new Runnable() {
+ clientSlowSerialExecutor.execute(new Runnable() {
public void run() {
System.out.println("Resuming persistent
requests");
Logger.normal(this, "Resuming persistent
requests");
@@ -990,7 +994,7 @@
}
public HighLevelSimpleClient makeClient(short prioClass, boolean
forceDontIgnoreTooManyPathComponents) {
- return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass,
forceDontIgnoreTooManyPathComponents);
+ return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass,
forceDontIgnoreTooManyPathComponents, clientSlowSerialExecutor);
}
public FCPServer getFCPServer() {
Added: trunk/freenet/src/freenet/support/SerialExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/SerialExecutor.java
(rev 0)
+++ trunk/freenet/src/freenet/support/SerialExecutor.java 2008-03-07
17:25:35 UTC (rev 18412)
@@ -0,0 +1,84 @@
+package freenet.support;
+
+import java.util.LinkedList;
+
+import freenet.node.PrioRunnable;
+import freenet.support.io.NativeThread;
+
+public class SerialExecutor implements Executor {
+
+ private final LinkedList jobs;
+ private final int priority;
+ private boolean waiting;
+
+ private final Runnable runner = new PrioRunnable() {
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void run() {
+ while(true) {
+ Runnable job;
+ synchronized(jobs) {
+ if(jobs.isEmpty()) {
+ waiting = true;
+ try {
+ jobs.wait();
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ continue;
+ } else {
+ job = (Runnable)
jobs.removeFirst();
+ waiting = false;
+ }
+ }
+ try {
+ job.run();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ Logger.error(this, "While running
"+job+" on "+this);
+ }
+ }
+ }
+
+ };
+
+ public SerialExecutor(int priority) {
+ jobs = new LinkedList();
+ this.priority = priority;
+ }
+
+ public void start(Executor realExecutor, String name) {
+ realExecutor.execute(runner, name);
+ }
+
+ public void execute(Runnable job, String jobName) {
+ synchronized(jobs) {
+ jobs.addLast(job);
+ }
+ }
+
+ public void execute(Runnable job, String jobName, boolean fromTicker) {
+ synchronized(jobs) {
+ jobs.addLast(job);
+ }
+ }
+
+ public int[] runningThreads() {
+ int[] running = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+ running[priority] = 1;
+ return running;
+ }
+
+ public int[] waitingThreads() {
+ int[] running = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+ synchronized(jobs) {
+ if(waiting)
+ running[priority] = 1;
+ }
+ return running;
+ }
+
+}