Author: toad
Date: 2008-10-14 17:42:02 +0000 (Tue, 14 Oct 2008)
New Revision: 22985

Added:
   branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
   branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
   branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
   branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
   branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
   branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
Log:
When compressing a big file into a chain of buckets for a persistent insert, 
write to disk every 1024'th bucket, to avoid huge commits running out of 
memory. Queue a job to remove the blocks on startup if we haven't been 
storeTo()ed by the next startup i.e. if we crash etc.


Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-10-14 17:42:02 UTC (rev 22985)
@@ -169,8 +169,10 @@
                                Logger.minor(this, "client.async returned data 
in returnBucket");
                }
                FetchResult res = result;
-               if(persistent())
+               if(persistent()) {
                        container.store(this);
+                       container.activate(clientCallback, 1);
+               }
                clientCallback.onSuccess(res, ClientGetter.this, container);
        }


Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-10-14 17:42:02 UTC (rev 22985)
@@ -3,6 +3,8 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import com.db4o.ObjectContainer;
+
 /**
  * Interface for an object which queues and runs DBJob's.
  * @author toad
@@ -10,9 +12,21 @@
 public interface DBJobRunner {

        public void queue(DBJob job, int priority, boolean checkDupes);
+       
+       /** Run this database job blocking. If we are already on the database 
thread, 
+        * run it inline, otherwise schedule it at the specified priority and 
wait for 
+        * it to finish. */
+       public void runBlocking(DBJob job, int priority);

        public boolean onDatabaseThread();

        public int getQueueSize(int priority);
-
+       
+       /** Queue a database job to be executed just after restart.
+        * All such jobs must be completed before any bucket cleanup occurs. */
+       public void queueRestartJob(DBJob job, int priority, ObjectContainer 
container);
+       
+       /** Remove a queued on-restart database job. */
+       public void removeRestartJob(DBJob job, int priority, ObjectContainer 
container);
+       
 }

Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
2008-10-14 17:42:02 UTC (rev 22985)
@@ -123,7 +123,7 @@

                                Compressor comp = 
Compressor.getCompressionAlgorithmByDifficulty(i);
                                Bucket result;
-                               result = comp.compress(origData, new 
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE), origData.size());
+                               result = comp.compress(origData, new 
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ? 
context.jobRunner : null), origData.size());
                                if(result.size() < minSize) {
                                        bestCodec = comp;
                                        if(bestCompressedData != null)
@@ -152,6 +152,8 @@
                                        public void run(ObjectContainer 
container, ClientContext context) {
                                                
if(container.ext().isActive(inserter))
                                                        Logger.error(this, 
"ALREADY ACTIVE in compressed callback: "+inserter);
+                                               // Must call storeTo at this 
point to cancel the delete-on-startup job.
+                                               output.data.storeTo(container);
                                                container.activate(inserter, 1);
                                                inserter.onCompressed(output, 
container, context);
                                                container.deactivate(inserter, 
1);

Modified: 
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-10-14 17:42:02 UTC (rev 22985)
@@ -204,8 +204,9 @@
                                container.commit(); // db4o is read-committed, 
so we need to commit here.
                        }
                } else /*if(request instanceof SendableInsert)*/ {
+                       container.activate(request, 1);
                        for(PersistentChosenBlock block : finishedBlocks) {
-                               container.activate(request, 1);
+                               container.activate(block, 1);
                                if(block.insertSucceeded()) {
                                        
((SendableInsert)request).onSuccess(block.token, container, context);
                                        container.commit(); // db4o is 
read-committed, so we need to commit here.

Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-10-13 
21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-10-14 
17:42:02 UTC (rev 22985)
@@ -60,6 +60,7 @@
 import freenet.support.Base64;
 import freenet.support.Executor;
 import freenet.support.Logger;
+import freenet.support.MutableBoolean;
 import freenet.support.OOMHandler;
 import freenet.support.OOMHook;
 import freenet.support.PrioritizedSerialExecutor;
@@ -86,6 +87,7 @@
        public final ArchiveManager archiveManager;
        public final RequestStarterGroup requestStarters;
        private final HealingQueue healingQueue;
+       public final NodeRestartJobsQueue restartJobsQueue;
        /** Must be included as a hidden field in order for any dangerous HTTP 
operation to complete successfully. */
        public final String formPassword;
        File downloadDir;
@@ -136,6 +138,7 @@
         * of them, so only cache a small number of them */
        private static final int FEC_QUEUE_CACHE_SIZE = 20;
        private UserAlert startingUpAlert;
+       private DBJob[] startupDatabaseJobs;

        NodeClientCore(Node node, Config config, SubConfig nodeConfig, File 
nodeDir, int portNumber, int sortOrder, SimpleFieldSet oldConfig, SubConfig 
fproxyConfig, SimpleToadletServer toadlets, ObjectContainer container) throws 
NodeInitException {
                this.node = node;
@@ -150,6 +153,8 @@
                this.formPassword = Base64.encode(pwdBuf);
                alerts = new UserAlertManager(this);
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               restartJobsQueue = NodeRestartJobsQueue.init(node.nodeDBHandle, 
container);
+               startupDatabaseJobs = 
restartJobsQueue.getRestartDatabaseJobs(container);

                persister = new ConfigurablePersister(this, nodeConfig, 
"clientThrottleFile", "client-throttle.dat", sortOrder++, true, false,
                        "NodeClientCore.fileForClientStats", 
"NodeClientCore.fileForClientStatsLong", node.ps, nodeDir);
@@ -565,6 +570,21 @@
                                return NativeThread.LOW_PRIORITY;
                        }
                }, "Startup completion thread");
+               
+               queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
+                               for(int i=0;i<startupDatabaseJobs.length;i++) {
+                                       try {
+                                               
startupDatabaseJobs[i].run(container, context);
+                                       } catch (Throwable t) {
+                                               Logger.error(this, "Caught 
"+t+" in startup job "+startupDatabaseJobs[i], t);
+                                       }
+                               }
+                               startupDatabaseJobs = null;
+                       }
+                       
+               }, NativeThread.HIGH_PRIORITY, false);
        }

        public interface SimpleRequestSenderCompletionListener {
@@ -1367,4 +1387,43 @@
                System.err.println("Out of memory: Emergency shutdown to 
protect database integrity in progress...");
                
System.exit(NodeInitException.EXIT_OUT_OF_MEMORY_PROTECTING_DATABASE);
        }
+
+       public void queueRestartJob(DBJob job, int priority, ObjectContainer 
container) {
+               restartJobsQueue.queueRestartJob(job, priority, container);
+       }
+
+       public void removeRestartJob(DBJob job, int priority, ObjectContainer 
container) {
+               restartJobsQueue.removeRestartJob(job, priority, container);
+       }
+
+       public void runBlocking(final DBJob job, int priority) {
+               if(clientDatabaseExecutor.onThread()) {
+                       job.run(node.db, clientContext);
+               } else {
+                       final MutableBoolean finished = new MutableBoolean();
+                       queue(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       try {
+                                               job.run(container, context);
+                                       } finally {
+                                               synchronized(finished) {
+                                                       finished.value = true;
+                                                       finished.notifyAll();
+                                               }
+                                       }
+                               }
+                               
+                       }, priority, false);
+                       synchronized(finished) {
+                               while(!finished.value) {
+                                       try {
+                                               finished.wait();
+                                       } catch (InterruptedException e) {
+                                               // Ignore
+                                       }
+                               }
+                       }
+               }
+       }
 }

Added: branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java            
                (rev 0)
+++ branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java    
2008-10-14 17:42:02 UTC (rev 22985)
@@ -0,0 +1,96 @@
+package freenet.node;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Predicate;
+
+import freenet.client.async.DBJob;
+import freenet.support.Logger;
+
+public class NodeRestartJobsQueue {
+       
+       private final long nodeDBHandle;
+
+       public NodeRestartJobsQueue(long nodeDBHandle2) {
+               nodeDBHandle = nodeDBHandle2;
+               dbJobs = new Set[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
+               for(int i=0;i<dbJobs.length;i++)
+                       dbJobs[i] = new HashSet<DBJob>();
+       }
+
+       public static NodeRestartJobsQueue init(final long nodeDBHandle, 
ObjectContainer container) {
+               ObjectSet<NodeRestartJobsQueue> results = 
+                       container.query(new Predicate<NodeRestartJobsQueue>() {
+
+                       @Override
+                       public boolean match(NodeRestartJobsQueue arg0) {
+                               return (arg0.nodeDBHandle == nodeDBHandle);
+                       }
+                       
+               });
+               if(results.hasNext()) {
+                       NodeRestartJobsQueue queue = (NodeRestartJobsQueue) 
results.next();
+                       container.activate(queue, 1);
+                       queue.onInit(container);
+                       return queue;
+               }
+               NodeRestartJobsQueue queue = new 
NodeRestartJobsQueue(nodeDBHandle);
+               container.store(queue);
+               return queue;
+       }
+
+       private void onInit(ObjectContainer container) {
+               // FIXME do something, maybe activate?
+       }
+
+       private final Set<DBJob>[] dbJobs;
+       
+       public void queueRestartJob(DBJob job, int priority, ObjectContainer 
container) {
+               container.activate(dbJobs[priority], 1);
+               dbJobs[priority].add(job);
+               container.store(dbJobs[priority]);
+               container.deactivate(dbJobs[priority], 1);
+       }
+       
+       public void removeRestartJob(DBJob job, int priority, ObjectContainer 
container) {
+               boolean jobWasActive = container.ext().isActive(job);
+               if(!jobWasActive) container.activate(job, 1);
+               container.activate(dbJobs[priority], 1);
+               if(!dbJobs[priority].remove(job)) {
+                       int found = 0;
+                       for(int i=0;i<dbJobs.length;i++) {
+                               container.activate(dbJobs[priority], 1);
+                               if(dbJobs[priority].remove(job)) {
+                                       container.store(dbJobs[priority]);
+                                       found++;
+                               }
+                               container.deactivate(dbJobs[priority], 1);
+                       }
+                       if(found > 0)
+                               Logger.error(this, "Job "+job+" not in 
specified priority "+priority+" found in "+found+" other priorities when 
removing");
+                       else
+                               Logger.error(this, "Job "+job+" not found when 
removing it");
+               } else {
+                       container.store(dbJobs[priority]);
+                       container.deactivate(dbJobs[priority], 1);
+               }
+               if(!jobWasActive) container.deactivate(job, 1);
+       }
+       
+       DBJob[] getRestartDatabaseJobs(ObjectContainer container) {
+               ArrayList<DBJob> list = new ArrayList<DBJob>();
+               for(int i=0;i<dbJobs.length;i++) {
+                       container.activate(dbJobs[i], 1);
+                       list.addAll(dbJobs[i]);
+                       dbJobs[i].clear();
+                       container.store(dbJobs[i]);
+                       container.deactivate(dbJobs[i], 1);
+               }
+               return list.toArray(new DBJob[list.size()]);
+       }
+       
+}

Modified: branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java 
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java 
2008-10-14 17:42:02 UTC (rev 22985)
@@ -11,6 +11,9 @@

 import com.db4o.ObjectContainer;

+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;

@@ -22,11 +25,20 @@
        private boolean freed;
        private boolean readOnly;
        private final BucketFactory bf;
-       
-       public BucketChainBucket(long bucketSize, BucketFactory bf) {
+       private final DBJobRunner dbJobRunner;
+       private boolean stored;
+
+       /**
+        * @param bucketSize
+        * @param bf
+        * @param dbJobRunner If not null, use this to store buckets to disk 
progressively
+        * to avoid a big transaction at the end. Caller then MUST call 
storeTo() at some point.
+        */
+       public BucketChainBucket(long bucketSize, BucketFactory bf, DBJobRunner 
dbJobRunner) {
                this.bucketSize = bucketSize;
                this.buckets = new Vector<Bucket>();
                this.bf = bf;
+               this.dbJobRunner = dbJobRunner;
                size = 0;
                freed = false;
                readOnly = false;
@@ -38,9 +50,19 @@
                this.size = size2;
                this.readOnly = readOnly;
                this.bf = bf2;
+               dbJobRunner = null;
        }

        public void free() {
+               if(dbJobRunner != null) {
+                       dbJobRunner.runBlocking(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       removeFrom(container);
+                               }
+                               
+                       }, NativeThread.HIGH_PRIORITY);
+               }
                Bucket[] list;
                synchronized(this) {
                        list = getBuckets();
@@ -272,6 +294,17 @@
                };
        }

+       private final DBJob killMe = new DBJob() {
+
+               public void run(ObjectContainer container, ClientContext 
context) {
+                       container.activate(BucketChainBucket.this, 1);
+                       if(stored) return;
+                       System.err.println("Freeing unfinished unstored bucket 
"+this);
+                       removeFrom(container);
+               }
+               
+       };
+       
        protected OutputStream makeBucketOutputStream(int i) throws IOException 
{
                Bucket bucket = bf.makeBucket(bucketSize);
                buckets.add(bucket);
@@ -279,6 +312,16 @@
                        throw new IllegalStateException("Added bucket, size 
should be " + (i + 1) + " but is " + buckets.size());
                if (buckets.get(i) != bucket)
                        throw new IllegalStateException("Bucket got replaced. 
Race condition?");
+               if(dbJobRunner != null && !stored && buckets.size() % 1024 == 
0) {
+                       dbJobRunner.runBlocking(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       container.store(BucketChainBucket.this);
+                                       dbJobRunner.queueRestartJob(killMe, 
NativeThread.HIGH_PRIORITY, container);
+                               }
+                               
+                       }, NativeThread.HIGH_PRIORITY);
+               }
                return bucket.getOutputStream();
        }

@@ -297,8 +340,10 @@
        public void storeTo(ObjectContainer container) {
                for(int i=0;i<buckets.size();i++)
                        ((Bucket) buckets.get(i)).storeTo(container);
+               stored = true;
                container.store(buckets);
                container.store(this);
+               dbJobRunner.removeRestartJob(killMe, 
NativeThread.HIGH_PRIORITY, container);
        }

        public void removeFrom(ObjectContainer container) {
@@ -311,6 +356,8 @@
                        list[i].removeFrom(container);
                container.delete(buckets);
                container.delete(this);
+               stored = false;
+               dbJobRunner.removeRestartJob(killMe, 
NativeThread.HIGH_PRIORITY, container);
        }

        public Bucket createShadow() throws IOException {

Modified: 
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java  
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java  
2008-10-14 17:42:02 UTC (rev 22985)
@@ -2,6 +2,7 @@

 import java.io.IOException;

+import freenet.client.async.DBJobRunner;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;

@@ -9,14 +10,24 @@

        final BucketFactory factory;
        final int blockSize;
+       final DBJobRunner runner;

-       public BucketChainBucketFactory(BucketFactory bucketFactory, int 
block_size) {
+       /**
+        * If you want persistent buckets which will be saved every 1000 
buckets, and
+        * deleted on restart if not stored by then, then pass in the 
DBJobRunner.
+        * Otherwise pass in null.
+        * @param bucketFactory
+        * @param block_size
+        * @param runner
+        */
+       public BucketChainBucketFactory(BucketFactory bucketFactory, int 
block_size, DBJobRunner runner) {
                this.factory = bucketFactory;
                this.blockSize = block_size;
+               this.runner = runner;
        }

        public Bucket makeBucket(long size) throws IOException {
-               return new BucketChainBucket(blockSize, factory);
+               return new BucketChainBucket(blockSize, factory, runner);
        }

 }


Reply via email to