Author: toad
Date: 2008-06-05 19:26:09 +0000 (Thu, 05 Jun 2008)
New Revision: 20225
Added:
branches/db4o/freenet/src/freenet/client/async/ClientContext.java
Modified:
branches/db4o/freenet/src/freenet/client/FECQueue.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
branches/db4o/freenet/src/freenet/client/async/DBJob.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
Log:
Wire in the FECQueue, well start to.
New class ClientContext contains stuff that a client layer operation may need
that is transient.
CRS itself is inappropriate because there are four of them; NodeClientCore
might work, but I'm using this.
Modified: branches/db4o/freenet/src/freenet/client/FECQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECQueue.java 2008-06-05
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/FECQueue.java 2008-06-05
19:26:09 UTC (rev 20225)
@@ -27,7 +27,7 @@
* Sorted by priority and then by time added.
* @author toad
*/
-class FECQueue implements OOMHook {
+public class FECQueue implements OOMHook {
private transient LinkedList[] transientQueue;
private transient LinkedList[] persistentQueueCache;
@@ -148,7 +148,7 @@
} else {
databaseJobRunner.queue(new DBJob() {
- public void
run(ObjectContainer container, RequestScheduler sched) {
+ public void
run(ObjectContainer container) {
if(job.isADecodingJob)
job.callback.onDecodedSegment(container);
else
@@ -180,7 +180,7 @@
private final DBJob cacheFillerJob = new DBJob() {
- public void run(ObjectContainer container, RequestScheduler
sched) {
+ public void run(ObjectContainer container) {
// Try to avoid accessing the database while
synchronized on the FECQueue.
while(true) {
boolean addedAny = false;
Added: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -0,0 +1,30 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import freenet.client.FECQueue;
+import freenet.node.NodeClientCore;
+
+/**
+ * Object passed in to client-layer operations, containing references to
essential but transient objects
+ * such as the schedulers and the FEC queue.
+ * @author toad
+ */
+public class ClientContext {
+
+ public final FECQueue fecQueue;
+ public final ClientRequestScheduler sskFetchScheduler;
+ public final ClientRequestScheduler chkFetchScheduler;
+ public final ClientRequestScheduler sskInsertScheduler;
+ public final ClientRequestScheduler chkInsertScheduler;
+
+ public ClientContext(NodeClientCore core) {
+ this.fecQueue = core.fecQueue;
+ this.sskFetchScheduler = core.requestStarters.sskFetchScheduler;
+ this.chkFetchScheduler = core.requestStarters.chkFetchScheduler;
+ this.sskInsertScheduler = core.requestStarters.sskPutScheduler;
+ this.chkInsertScheduler = core.requestStarters.chkPutScheduler;
+ }
+
+}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -7,6 +7,7 @@
import com.db4o.ObjectContainer;
+import freenet.client.FECQueue;
import freenet.config.EnumerableOptionCallback;
import freenet.config.InvalidConfigValueException;
import freenet.config.SubConfig;
@@ -100,13 +101,14 @@
private final CooldownQueue persistentCooldownQueue;
final PrioritizedSerialExecutor databaseExecutor;
final PrioritizedSerialExecutor datastoreCheckerExecutor;
+ public final ClientContext clientContext;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
public static final String PRIORITY_HARD = "HARD";
private String choosenPriorityScheduler;
- public ClientRequestScheduler(boolean forInserts, boolean forSSKs,
RandomSource random, RequestStarter starter, Node node, NodeClientCore core,
SubConfig sc, String name) {
+ public ClientRequestScheduler(boolean forInserts, boolean forSSKs,
RandomSource random, RequestStarter starter, Node node, NodeClientCore core,
SubConfig sc, String name, ClientContext context) {
this.selectorContainer = node.db;
schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this);
schedTransient = new ClientRequestSchedulerNonPersistent(this);
@@ -120,6 +122,7 @@
this.node = node;
this.isInsertScheduler = forInserts;
this.isSSKScheduler = forSSKs;
+ this.clientContext = context;
this.name = name;
sc.register(name+"_priority_policy", PRIORITY_HARD,
name.hashCode(), true, false,
@@ -687,4 +690,8 @@
}, prio, name);
}
+ public FECQueue getFECQueue() {
+ return clientContext.fecQueue;
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -137,4 +137,8 @@
return client.persistent();
}
+ public boolean isPersistent() {
+ return client.persistent();
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java 2008-06-05
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java 2008-06-05
19:26:09 UTC (rev 20225)
@@ -14,6 +14,6 @@
*/
public interface DBJob {
- void run(ObjectContainer container, RequestScheduler sched);
+ void run(ObjectContainer container);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -13,6 +13,7 @@
import freenet.client.FECCallback;
import freenet.client.FECCodec;
import freenet.client.FECJob;
+import freenet.client.FECQueue;
import freenet.client.FailureCodeTracker;
import freenet.client.FetchContext;
import freenet.client.FetchException;
@@ -153,7 +154,7 @@
return fatallyFailedBlocks;
}
- public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer
container) {
+ public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer
container, RequestScheduler sched) {
boolean decodeNow = false;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on
"+seg);
@@ -200,18 +201,20 @@
seg.possiblyRemoveFromParent();
if(decodeNow) {
removeSubSegments();
- decode();
+ decode(container, sched);
}
}
- public void decode() {
+ public void decode(ObjectContainer container, RequestScheduler sched) {
// Now decode
if(logMINOR) Logger.minor(this, "Decoding
"+SplitFileFetcherSegment.this);
codec = FECCodec.getCodec(splitfileType, dataKeys.length,
checkKeys.length, blockFetchContext.executor);
if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
- codec.addToQueue(new FECJob(codec, dataBuckets,
checkBuckets, CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true));
+ FECQueue queue = sched.getFECQueue();
+ codec.addToQueue(new FECJob(codec, queue, dataBuckets,
checkBuckets, CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true,
parentFetcher.parent.getPriorityClass(), parentFetcher.parent.isPersistent()),
+ queue, container);
// Now have all the data blocks (not necessarily all
the check blocks)
}
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-05 18:44:34 UTC (rev 20224)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -256,7 +256,7 @@
onFailure(new FetchException(FetchException.CANCELLED),
token, sched, container);
return;
}
- segment.onSuccess(data, blockNo, this, block, container);
+ segment.onSuccess(data, blockNo, this, block, container, sched);
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-05
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-05
19:26:09 UTC (rev 20225)
@@ -5,11 +5,15 @@
import java.net.URI;
import freenet.client.ArchiveManager;
+import freenet.client.FECQueue;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
import freenet.client.InsertContext;
import freenet.client.async.BackgroundBlockEncoder;
+import freenet.client.async.ClientContext;
import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
import freenet.client.async.HealingQueue;
import freenet.client.async.SimpleHealingQueue;
import freenet.client.async.USKManager;
@@ -68,7 +72,7 @@
/**
* The connection between the node and the client layer.
*/
-public class NodeClientCore implements Persistable {
+public class NodeClientCore implements Persistable, DBJobRunner {
private static boolean logMINOR;
public final USKManager uskManager;
@@ -90,6 +94,7 @@
final NodeStats nodeStats;
public final RandomSource random;
final File tempDir;
+ public final FECQueue fecQueue;
// Persistent temporary buckets
public final PersistentTempBucketFactory persistentTempBucketFactory;
@@ -137,6 +142,7 @@
this.node = node;
this.nodeStats = node.nodeStats;
this.random = node.random;
+ fecQueue = new FECQueue();
this.backgroundBlockEncoder = new BackgroundBlockEncoder();
clientSlowSerialExecutor = new
SerialExecutor[RequestStarter.MINIMUM_PRIORITY_CLASS-RequestStarter.MAXIMUM_PRIORITY_CLASS+1];
for(int i=0;i<clientSlowSerialExecutor.length;i++) {
@@ -165,7 +171,7 @@
if(logMINOR) Logger.minor(this, "Read
throttleFS:\n"+throttleFS);
if(logMINOR) Logger.minor(this, "Serializing
RequestStarterGroup from:\n"+throttleFS);
- requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS);
+ requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS, new ClientContext(this));
// Temp files
@@ -1153,4 +1159,19 @@
public long countTransientQueuedRequests() {
return requestStarters.countTransientQueuedRequests();
}
+
+ public void queue(final DBJob job, int priority) {
+ this.clientDatabaseExecutor.execute(new Runnable() {
+
+ public void run() {
+ try {
+ job.run(node.db);
+ node.db.commit();
+ } catch (Throwable t) {
+ Logger.error(this, "Failed to run
database job "+job+" : caught "+t, t);
+ }
+ }
+
+ }, priority, ""+job);
+ }
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -5,6 +5,7 @@
import java.util.LinkedList;
+import freenet.client.FECQueue;
import freenet.client.async.ChosenRequest;
import freenet.keys.ClientKey;
import freenet.keys.Key;
@@ -61,5 +62,7 @@
public void callFailure(final SendableInsert put, final
LowLevelPutException e, final Object keyNum, int prio, String name);
public void callSuccess(final SendableInsert put, final Object keyNum,
int prio, String name);
+
+ public FECQueue getFECQueue();
}
Modified: branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
2008-06-05 19:26:09 UTC (rev 20225)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import freenet.client.async.ClientContext;
import freenet.client.async.ClientRequestScheduler;
import freenet.config.Config;
import freenet.config.SubConfig;
@@ -34,7 +35,7 @@
public final ClientRequestScheduler sskFetchScheduler;
public final ClientRequestScheduler sskPutScheduler;
- RequestStarterGroup(Node node, NodeClientCore core, int portNumber,
RandomSource random, Config config, SimpleFieldSet fs) {
+ RequestStarterGroup(Node node, NodeClientCore core, int portNumber,
RandomSource random, Config config, SimpleFieldSet fs, ClientContext ctx) {
SubConfig schedulerConfig = new SubConfig("node.scheduler",
config);
NodeStats stats = core.nodeStats;
@@ -45,27 +46,27 @@
throttleWindowRequest = new ThrottleWindowManager(2.0, fs ==
null ? null : fs.subset("ThrottleWindowRequest"), node);
chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request", fs == null ? null : fs.subset("CHKRequestThrottle"),
32768);
chkRequestStarter = new RequestStarter(core,
chkRequestThrottle, "CHK Request starter ("+portNumber+ ')',
stats.requestOutputThrottle, stats.requestInputThrottle,
stats.localChkFetchBytesSentAverage, stats.localChkFetchBytesReceivedAverage,
false, false);
- chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, node, core, schedulerConfig, "CHKrequester");
+ chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, node, core, schedulerConfig, "CHKrequester", ctx);
chkRequestStarter.setScheduler(chkFetchScheduler);
chkRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
chkInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "CHK Insert", fs == null ? null : fs.subset("CHKInsertThrottle"), 32768);
chkInsertStarter = new RequestStarter(core, chkInsertThrottle,
"CHK Insert starter ("+portNumber+ ')', stats.requestOutputThrottle,
stats.requestInputThrottle, stats.localChkInsertBytesSentAverage,
stats.localChkInsertBytesReceivedAverage, true, false);
- chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, node, core, schedulerConfig, "CHKinserter");
+ chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, node, core, schedulerConfig, "CHKinserter", ctx);
chkInsertStarter.setScheduler(chkPutScheduler);
chkInsertStarter.start();
sskRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "SSK Request", fs == null ? null : fs.subset("SSKRequestThrottle"), 1024);
sskRequestStarter = new RequestStarter(core,
sskRequestThrottle, "SSK Request starter ("+portNumber+ ')',
stats.requestOutputThrottle, stats.requestInputThrottle,
stats.localSskFetchBytesSentAverage, stats.localSskFetchBytesReceivedAverage,
false, true);
- sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, node, core, schedulerConfig, "SSKrequester");
+ sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, node, core, schedulerConfig, "SSKrequester", ctx);
sskRequestStarter.setScheduler(sskFetchScheduler);
sskRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
sskInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "SSK Insert", fs == null ? null : fs.subset("SSKInsertThrottle"), 1024);
sskInsertStarter = new RequestStarter(core, sskInsertThrottle,
"SSK Insert starter ("+portNumber+ ')', stats.requestOutputThrottle,
stats.requestInputThrottle, stats.localSskInsertBytesSentAverage,
stats.localSskFetchBytesReceivedAverage, true, true);
- sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, node, core, schedulerConfig, "SSKinserter");
+ sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, node, core, schedulerConfig, "SSKinserter", ctx);
sskInsertStarter.setScheduler(sskPutScheduler);
sskInsertStarter.start();