Author: toad
Date: 2008-06-21 22:13:07 +0000 (Sat, 21 Jun 2008)
New Revision: 20603

Modified:
   branches/db4o/freenet/src/freenet/client/FECQueue.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
Persist the FECQueue even though it has no actual persistent members except the 
tag used for finding it.
FECJob's refer to it by pointer and are queried as such, hence we need it to 
persist.
This should fix the "inserts stall at exactly 50% if you restart before FEC 
finishes" bug.

Modified: branches/db4o/freenet/src/freenet/client/FECQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECQueue.java      2008-06-21 
22:09:44 UTC (rev 20602)
+++ branches/db4o/freenet/src/freenet/client/FECQueue.java      2008-06-21 
22:13:07 UTC (rev 20603)
@@ -9,6 +9,7 @@

 import com.db4o.ObjectContainer;
 import com.db4o.ObjectSet;
+import com.db4o.query.Predicate;
 import com.db4o.query.Query;

 import freenet.client.async.ClientContext;
@@ -25,6 +26,9 @@
  * The FEC queue. Uses a limited number of threads (at most one per core), a 
non-persistent queue,
  * a persistent queue (kept in the database), and a transient cache of the 
persistent queue.
  * Sorted by priority and then by time added.
+ * 
+ * Note that the FECQueue must be pulled from the database, because FECJob's 
are queried based
+ * on their referring to it.
  * @author toad
  */
 public class FECQueue implements OOMHook {
@@ -38,9 +42,30 @@
        private transient ClientContext clientContext;
        private transient int runningFECThreads;
        private transient int fecPoolCounter;
+       private final long nodeDBHandle;

+       public static FECQueue create(final long nodeDBHandle, ObjectContainer 
container) {
+               ObjectSet result = container.query(new Predicate() {
+                       public boolean match(FECQueue queue) {
+                               if(queue.nodeDBHandle == nodeDBHandle) return 
true;
+                               return false;
+                       }
+               });
+               if(result.hasNext()) {
+                       return (FECQueue) result.next();
+               } else {
+                       FECQueue queue = new FECQueue(nodeDBHandle);
+                       container.set(queue);
+                       return queue;
+               }
+       }
+       
+       private FECQueue(long nodeDBHandle) {
+               this.nodeDBHandle = nodeDBHandle;
+       }
+
        /** Called after creating or deserializing the FECQueue. Initialises 
all the transient fields. */
-       public void init(int priorities, int maxCacheSize, DBJobRunner 
dbJobRunner, Executor exec, ClientContext clientContext) {
+       void init(int priorities, int maxCacheSize, DBJobRunner dbJobRunner, 
Executor exec, ClientContext clientContext) {
                this.priorities = priorities;
                this.maxPersistentQueueCacheSize = maxCacheSize;
                this.databaseJobRunner = dbJobRunner;
@@ -150,6 +175,8 @@
                                                        else
                                                                
job.callback.onEncodedSegment(null, clientContext);
                                                } else {
+                                                       
if(Logger.shouldLog(Logger.MINOR, this))
+                                                               
Logger.minor(this, "Scheduling callback...");
                                                        
databaseJobRunner.queue(new DBJob() {

                                                                public void 
run(ObjectContainer container, ClientContext context) {
@@ -200,7 +227,7 @@
                                                if(newCached >= 
maxPersistentQueueCacheSize) return;
                                                grab = 
maxPersistentQueueCacheSize - newCached;
                                        }
-                                       if(logMINOR) Logger.minor(this, 
"Grabbing up to "+grab+" jobs");
+                                       if(logMINOR) Logger.minor(this, 
"Grabbing up to "+grab+" jobs at priority "+prio);
                                        Query query = container.query();
                                        query.constrain(FECJob.class);
                                        query.descend("priority").constrain(new 
Short(prio));

Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-21 
22:09:44 UTC (rev 20602)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-21 
22:13:07 UTC (rev 20603)
@@ -149,7 +149,7 @@
                this.node = node;
                this.nodeStats = node.nodeStats;
                this.random = node.random;
-               fecQueue = new FECQueue();
+               fecQueue = FECQueue.create(node.nodeDBHandle, container);
                this.backgroundBlockEncoder = new BackgroundBlockEncoder();
                clientDatabaseExecutor = new 
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY, 
NativeThread.MAX_PRIORITY+1, NativeThread.NORM_PRIORITY, true);
                datastoreCheckerExecutor = new 
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY, 
RequestStarter.NUMBER_OF_PRIORITY_CLASSES, 0, false);


Reply via email to