Author: toad
Date: 2008-06-05 19:26:09 +0000 (Thu, 05 Jun 2008)
New Revision: 20225

Added:
   branches/db4o/freenet/src/freenet/client/async/ClientContext.java
Modified:
   branches/db4o/freenet/src/freenet/client/FECQueue.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
   branches/db4o/freenet/src/freenet/client/async/DBJob.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
Log:
Wire in the FECQueue, well start to.
New class ClientContext contains stuff that a client layer operation may need 
that is transient.
CRS itself is inappropriate because there are four of them; NodeClientCore 
might work, but I'm using this.

Modified: branches/db4o/freenet/src/freenet/client/FECQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECQueue.java      2008-06-05 
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/FECQueue.java      2008-06-05 
19:26:09 UTC (rev 20225)
@@ -27,7 +27,7 @@
  * Sorted by priority and then by time added.
  * @author toad
  */
-class FECQueue implements OOMHook {
+public class FECQueue implements OOMHook {

        private transient LinkedList[] transientQueue;
        private transient LinkedList[] persistentQueueCache;
@@ -148,7 +148,7 @@
                                                } else {
                                                        
databaseJobRunner.queue(new DBJob() {

-                                                               public void 
run(ObjectContainer container, RequestScheduler sched) {
+                                                               public void 
run(ObjectContainer container) {
                                                                        
if(job.isADecodingJob)
                                                                                
job.callback.onDecodedSegment(container);
                                                                        else
@@ -180,7 +180,7 @@

        private final DBJob cacheFillerJob = new DBJob() {

-               public void run(ObjectContainer container, RequestScheduler 
sched) {
+               public void run(ObjectContainer container) {
                        // Try to avoid accessing the database while 
synchronized on the FECQueue.
                        while(true) {
                                boolean addedAny = false;

Added: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java           
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java   
2008-06-05 19:26:09 UTC (rev 20225)
@@ -0,0 +1,30 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import freenet.client.FECQueue;
+import freenet.node.NodeClientCore;
+
+/**
+ * Object passed in to client-layer operations, containing references to 
essential but transient objects
+ * such as the schedulers and the FEC queue.
+ * @author toad
+ */
+public class ClientContext {
+       
+       public final FECQueue fecQueue;
+       public final ClientRequestScheduler sskFetchScheduler;
+       public final ClientRequestScheduler chkFetchScheduler;
+       public final ClientRequestScheduler sskInsertScheduler;
+       public final ClientRequestScheduler chkInsertScheduler;
+
+       public ClientContext(NodeClientCore core) {
+               this.fecQueue = core.fecQueue;
+               this.sskFetchScheduler = core.requestStarters.sskFetchScheduler;
+               this.chkFetchScheduler = core.requestStarters.chkFetchScheduler;
+               this.sskInsertScheduler = core.requestStarters.sskPutScheduler;
+               this.chkInsertScheduler = core.requestStarters.chkPutScheduler;
+       }
+       
+}

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-05 19:26:09 UTC (rev 20225)
@@ -7,6 +7,7 @@

 import com.db4o.ObjectContainer;

+import freenet.client.FECQueue;
 import freenet.config.EnumerableOptionCallback;
 import freenet.config.InvalidConfigValueException;
 import freenet.config.SubConfig;
@@ -100,13 +101,14 @@
        private final CooldownQueue persistentCooldownQueue;
        final PrioritizedSerialExecutor databaseExecutor;
        final PrioritizedSerialExecutor datastoreCheckerExecutor;
+       public final ClientContext clientContext;

        public static final String PRIORITY_NONE = "NONE";
        public static final String PRIORITY_SOFT = "SOFT";
        public static final String PRIORITY_HARD = "HARD";
        private String choosenPriorityScheduler; 

-       public ClientRequestScheduler(boolean forInserts, boolean forSSKs, 
RandomSource random, RequestStarter starter, Node node, NodeClientCore core, 
SubConfig sc, String name) {
+       public ClientRequestScheduler(boolean forInserts, boolean forSSKs, 
RandomSource random, RequestStarter starter, Node node, NodeClientCore core, 
SubConfig sc, String name, ClientContext context) {
                this.selectorContainer = node.db;
                schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this);
                schedTransient = new ClientRequestSchedulerNonPersistent(this);
@@ -120,6 +122,7 @@
                this.node = node;
                this.isInsertScheduler = forInserts;
                this.isSSKScheduler = forSSKs;
+               this.clientContext = context;

                this.name = name;
                sc.register(name+"_priority_policy", PRIORITY_HARD, 
name.hashCode(), true, false,
@@ -687,4 +690,8 @@
                }, prio, name);
        }

+       public FECQueue getFECQueue() {
+               return clientContext.fecQueue;
+       }
+
 }

Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequester.java 
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequester.java 
2008-06-05 19:26:09 UTC (rev 20225)
@@ -137,4 +137,8 @@
                return client.persistent();
        }

+       public boolean isPersistent() {
+               return client.persistent();
+       }
+
 }

Modified: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java   2008-06-05 
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java   2008-06-05 
19:26:09 UTC (rev 20225)
@@ -14,6 +14,6 @@
  */
 public interface DBJob {

-       void run(ObjectContainer container, RequestScheduler sched);
+       void run(ObjectContainer container);

 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-05 19:26:09 UTC (rev 20225)
@@ -13,6 +13,7 @@
 import freenet.client.FECCallback;
 import freenet.client.FECCodec;
 import freenet.client.FECJob;
+import freenet.client.FECQueue;
 import freenet.client.FailureCodeTracker;
 import freenet.client.FetchContext;
 import freenet.client.FetchException;
@@ -153,7 +154,7 @@
                return fatallyFailedBlocks;
        }

-       public void onSuccess(Bucket data, int blockNo, 
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer 
container) {
+       public void onSuccess(Bucket data, int blockNo, 
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer 
container, RequestScheduler sched) {
                boolean decodeNow = false;
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on 
"+seg);
@@ -200,18 +201,20 @@
                seg.possiblyRemoveFromParent();
                if(decodeNow) {
                        removeSubSegments();
-                       decode();
+                       decode(container, sched);
                }
        }

-       public void decode() {
+       public void decode(ObjectContainer container, RequestScheduler sched) {
                // Now decode
                if(logMINOR) Logger.minor(this, "Decoding 
"+SplitFileFetcherSegment.this);

                codec = FECCodec.getCodec(splitfileType, dataKeys.length, 
checkKeys.length, blockFetchContext.executor);

                if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
-                       codec.addToQueue(new FECJob(codec, dataBuckets, 
checkBuckets, CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true));
+                       FECQueue queue = sched.getFECQueue();
+                       codec.addToQueue(new FECJob(codec, queue, dataBuckets, 
checkBuckets, CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true, 
parentFetcher.parent.getPriorityClass(), parentFetcher.parent.isPersistent()), 
+                                       queue, container);
                        // Now have all the data blocks (not necessarily all 
the check blocks)
                }
        }

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-06-05 18:44:34 UTC (rev 20224)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-06-05 19:26:09 UTC (rev 20225)
@@ -256,7 +256,7 @@
                        onFailure(new FetchException(FetchException.CANCELLED), 
token, sched, container);
                        return;
                }
-               segment.onSuccess(data, blockNo, this, block, container);
+               segment.onSuccess(data, blockNo, this, block, container, sched);
        }

        /** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it 
via onFailure

Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-05 
18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-05 
19:26:09 UTC (rev 20225)
@@ -5,11 +5,15 @@
 import java.net.URI;

 import freenet.client.ArchiveManager;
+import freenet.client.FECQueue;
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
 import freenet.client.InsertContext;
 import freenet.client.async.BackgroundBlockEncoder;
+import freenet.client.async.ClientContext;
 import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
 import freenet.client.async.HealingQueue;
 import freenet.client.async.SimpleHealingQueue;
 import freenet.client.async.USKManager;
@@ -68,7 +72,7 @@
 /**
  * The connection between the node and the client layer.
  */
-public class NodeClientCore implements Persistable {
+public class NodeClientCore implements Persistable, DBJobRunner {

        private static boolean logMINOR;
        public final USKManager uskManager;
@@ -90,6 +94,7 @@
        final NodeStats nodeStats;
        public final RandomSource random;
        final File tempDir;
+       public final FECQueue fecQueue;

        // Persistent temporary buckets
        public final PersistentTempBucketFactory persistentTempBucketFactory;
@@ -137,6 +142,7 @@
                this.node = node;
                this.nodeStats = node.nodeStats;
                this.random = node.random;
+               fecQueue = new FECQueue();
                this.backgroundBlockEncoder = new BackgroundBlockEncoder();
                clientSlowSerialExecutor = new 
SerialExecutor[RequestStarter.MINIMUM_PRIORITY_CLASS-RequestStarter.MAXIMUM_PRIORITY_CLASS+1];
                for(int i=0;i<clientSlowSerialExecutor.length;i++) {
@@ -165,7 +171,7 @@
                if(logMINOR) Logger.minor(this, "Read 
throttleFS:\n"+throttleFS);

                if(logMINOR) Logger.minor(this, "Serializing 
RequestStarterGroup from:\n"+throttleFS);
-               requestStarters = new RequestStarterGroup(node, this, 
portNumber, random, config, throttleFS);
+               requestStarters = new RequestStarterGroup(node, this, 
portNumber, random, config, throttleFS, new ClientContext(this));

                // Temp files

@@ -1153,4 +1159,19 @@
        public long countTransientQueuedRequests() {
                return requestStarters.countTransientQueuedRequests();
        }
+
+       public void queue(final DBJob job, int priority) {
+               this.clientDatabaseExecutor.execute(new Runnable() {
+
+                       public void run() {
+                               try {
+                                       job.run(node.db);
+                                       node.db.commit();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Failed to run 
database job "+job+" : caught "+t, t);
+                               }
+                       }
+                       
+               }, priority, ""+job);
+       }
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-06-05 19:26:09 UTC (rev 20225)
@@ -5,6 +5,7 @@

 import java.util.LinkedList;

+import freenet.client.FECQueue;
 import freenet.client.async.ChosenRequest;
 import freenet.keys.ClientKey;
 import freenet.keys.Key;
@@ -61,5 +62,7 @@
        public void callFailure(final SendableInsert put, final 
LowLevelPutException e, final Object keyNum, int prio, String name);

        public void callSuccess(final SendableInsert put, final Object keyNum, 
int prio, String name);
+
+       public FECQueue getFECQueue();

 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java     
2008-06-05 18:44:34 UTC (rev 20224)
+++ branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java     
2008-06-05 19:26:09 UTC (rev 20225)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

+import freenet.client.async.ClientContext;
 import freenet.client.async.ClientRequestScheduler;
 import freenet.config.Config;
 import freenet.config.SubConfig;
@@ -34,7 +35,7 @@
        public final ClientRequestScheduler sskFetchScheduler;
        public final ClientRequestScheduler sskPutScheduler;

-       RequestStarterGroup(Node node, NodeClientCore core, int portNumber, 
RandomSource random, Config config, SimpleFieldSet fs) {
+       RequestStarterGroup(Node node, NodeClientCore core, int portNumber, 
RandomSource random, Config config, SimpleFieldSet fs, ClientContext ctx) {
                SubConfig schedulerConfig = new SubConfig("node.scheduler", 
config);
                NodeStats stats = core.nodeStats;

@@ -45,27 +46,27 @@
                throttleWindowRequest = new ThrottleWindowManager(2.0, fs == 
null ? null : fs.subset("ThrottleWindowRequest"), node);
                chkRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "CHK Request", fs == null ? null : fs.subset("CHKRequestThrottle"), 
32768);
                chkRequestStarter = new RequestStarter(core, 
chkRequestThrottle, "CHK Request starter ("+portNumber+ ')', 
stats.requestOutputThrottle, stats.requestInputThrottle, 
stats.localChkFetchBytesSentAverage, stats.localChkFetchBytesReceivedAverage, 
false, false);
-               chkFetchScheduler = new ClientRequestScheduler(false, false, 
random, chkRequestStarter, node, core, schedulerConfig, "CHKrequester");
+               chkFetchScheduler = new ClientRequestScheduler(false, false, 
random, chkRequestStarter, node, core, schedulerConfig, "CHKrequester", ctx);
                chkRequestStarter.setScheduler(chkFetchScheduler);
                chkRequestStarter.start();
                //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
                // FIXME reenable the above
                chkInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "CHK Insert", fs == null ? null : fs.subset("CHKInsertThrottle"), 32768);
                chkInsertStarter = new RequestStarter(core, chkInsertThrottle, 
"CHK Insert starter ("+portNumber+ ')', stats.requestOutputThrottle, 
stats.requestInputThrottle, stats.localChkInsertBytesSentAverage, 
stats.localChkInsertBytesReceivedAverage, true, false);
-               chkPutScheduler = new ClientRequestScheduler(true, false, 
random, chkInsertStarter, node, core, schedulerConfig, "CHKinserter");
+               chkPutScheduler = new ClientRequestScheduler(true, false, 
random, chkInsertStarter, node, core, schedulerConfig, "CHKinserter", ctx);
                chkInsertStarter.setScheduler(chkPutScheduler);
                chkInsertStarter.start();

                sskRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "SSK Request", fs == null ? null : fs.subset("SSKRequestThrottle"), 1024);
                sskRequestStarter = new RequestStarter(core, 
sskRequestThrottle, "SSK Request starter ("+portNumber+ ')', 
stats.requestOutputThrottle, stats.requestInputThrottle, 
stats.localSskFetchBytesSentAverage, stats.localSskFetchBytesReceivedAverage, 
false, true);
-               sskFetchScheduler = new ClientRequestScheduler(false, true, 
random, sskRequestStarter, node, core, schedulerConfig, "SSKrequester");
+               sskFetchScheduler = new ClientRequestScheduler(false, true, 
random, sskRequestStarter, node, core, schedulerConfig, "SSKrequester", ctx);
                sskRequestStarter.setScheduler(sskFetchScheduler);
                sskRequestStarter.start();
                //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
                // FIXME reenable the above
                sskInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "SSK Insert", fs == null ? null : fs.subset("SSKInsertThrottle"), 1024);
                sskInsertStarter = new RequestStarter(core, sskInsertThrottle, 
"SSK Insert starter ("+portNumber+ ')', stats.requestOutputThrottle, 
stats.requestInputThrottle, stats.localSskInsertBytesSentAverage, 
stats.localSskFetchBytesReceivedAverage, true, true);
-               sskPutScheduler = new ClientRequestScheduler(true, true, 
random, sskInsertStarter, node, core, schedulerConfig, "SSKinserter");
+               sskPutScheduler = new ClientRequestScheduler(true, true, 
random, sskInsertStarter, node, core, schedulerConfig, "SSKinserter", ctx);
                sskInsertStarter.setScheduler(sskPutScheduler);
                sskInsertStarter.start();



Reply via email to