Author: toad
Date: 2008-10-14 17:42:02 +0000 (Tue, 14 Oct 2008)
New Revision: 22985
Added:
branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
Log:
When compressing a big file into a chain of buckets for a persistent insert,
write to disk every 1024'th bucket, to avoid huge commits running out of
memory. Queue a job to remove the blocks on startup if we haven't been
storeTo()ed by the next startup i.e. if we crash etc.
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -169,8 +169,10 @@
Logger.minor(this, "client.async returned data
in returnBucket");
}
FetchResult res = result;
- if(persistent())
+ if(persistent()) {
container.store(this);
+ container.activate(clientCallback, 1);
+ }
clientCallback.onSuccess(res, ClientGetter.this, container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
/**
* Interface for an object which queues and runs DBJob's.
* @author toad
@@ -10,9 +12,21 @@
public interface DBJobRunner {
public void queue(DBJob job, int priority, boolean checkDupes);
+
+ /** Run this database job blocking. If we are already on the database
thread,
+ * run it inline, otherwise schedule it at the specified priority and
wait for
+ * it to finish. */
+ public void runBlocking(DBJob job, int priority);
public boolean onDatabaseThread();
public int getQueueSize(int priority);
-
+
+ /** Queue a database job to be executed just after restart.
+ * All such jobs must be completed before any bucket cleanup occurs. */
+ public void queueRestartJob(DBJob job, int priority, ObjectContainer
container);
+
+ /** Remove a queued on-restart database job. */
+ public void removeRestartJob(DBJob job, int priority, ObjectContainer
container);
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -123,7 +123,7 @@
Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
Bucket result;
- result = comp.compress(origData, new
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE), origData.size());
+ result = comp.compress(origData, new
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ?
context.jobRunner : null), origData.size());
if(result.size() < minSize) {
bestCodec = comp;
if(bestCompressedData != null)
@@ -152,6 +152,8 @@
public void run(ObjectContainer
container, ClientContext context) {
if(container.ext().isActive(inserter))
Logger.error(this,
"ALREADY ACTIVE in compressed callback: "+inserter);
+ // Must call storeTo at this
point to cancel the delete-on-startup job.
+ output.data.storeTo(container);
container.activate(inserter, 1);
inserter.onCompressed(output,
container, context);
container.deactivate(inserter,
1);
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -204,8 +204,9 @@
container.commit(); // db4o is read-committed,
so we need to commit here.
}
} else /*if(request instanceof SendableInsert)*/ {
+ container.activate(request, 1);
for(PersistentChosenBlock block : finishedBlocks) {
- container.activate(request, 1);
+ container.activate(block, 1);
if(block.insertSucceeded()) {
((SendableInsert)request).onSuccess(block.token, container, context);
container.commit(); // db4o is
read-committed, so we need to commit here.
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-10-13
21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-10-14
17:42:02 UTC (rev 22985)
@@ -60,6 +60,7 @@
import freenet.support.Base64;
import freenet.support.Executor;
import freenet.support.Logger;
+import freenet.support.MutableBoolean;
import freenet.support.OOMHandler;
import freenet.support.OOMHook;
import freenet.support.PrioritizedSerialExecutor;
@@ -86,6 +87,7 @@
public final ArchiveManager archiveManager;
public final RequestStarterGroup requestStarters;
private final HealingQueue healingQueue;
+ public final NodeRestartJobsQueue restartJobsQueue;
/** Must be included as a hidden field in order for any dangerous HTTP
operation to complete successfully. */
public final String formPassword;
File downloadDir;
@@ -136,6 +138,7 @@
* of them, so only cache a small number of them */
private static final int FEC_QUEUE_CACHE_SIZE = 20;
private UserAlert startingUpAlert;
+ private DBJob[] startupDatabaseJobs;
NodeClientCore(Node node, Config config, SubConfig nodeConfig, File
nodeDir, int portNumber, int sortOrder, SimpleFieldSet oldConfig, SubConfig
fproxyConfig, SimpleToadletServer toadlets, ObjectContainer container) throws
NodeInitException {
this.node = node;
@@ -150,6 +153,8 @@
this.formPassword = Base64.encode(pwdBuf);
alerts = new UserAlertManager(this);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ restartJobsQueue = NodeRestartJobsQueue.init(node.nodeDBHandle,
container);
+ startupDatabaseJobs =
restartJobsQueue.getRestartDatabaseJobs(container);
persister = new ConfigurablePersister(this, nodeConfig,
"clientThrottleFile", "client-throttle.dat", sortOrder++, true, false,
"NodeClientCore.fileForClientStats",
"NodeClientCore.fileForClientStatsLong", node.ps, nodeDir);
@@ -565,6 +570,21 @@
return NativeThread.LOW_PRIORITY;
}
}, "Startup completion thread");
+
+ queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ for(int i=0;i<startupDatabaseJobs.length;i++) {
+ try {
+
startupDatabaseJobs[i].run(container, context);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught
"+t+" in startup job "+startupDatabaseJobs[i], t);
+ }
+ }
+ startupDatabaseJobs = null;
+ }
+
+ }, NativeThread.HIGH_PRIORITY, false);
}
public interface SimpleRequestSenderCompletionListener {
@@ -1367,4 +1387,43 @@
System.err.println("Out of memory: Emergency shutdown to
protect database integrity in progress...");
System.exit(NodeInitException.EXIT_OUT_OF_MEMORY_PROTECTING_DATABASE);
}
+
+ public void queueRestartJob(DBJob job, int priority, ObjectContainer
container) {
+ restartJobsQueue.queueRestartJob(job, priority, container);
+ }
+
+ public void removeRestartJob(DBJob job, int priority, ObjectContainer
container) {
+ restartJobsQueue.removeRestartJob(job, priority, container);
+ }
+
+ public void runBlocking(final DBJob job, int priority) {
+ if(clientDatabaseExecutor.onThread()) {
+ job.run(node.db, clientContext);
+ } else {
+ final MutableBoolean finished = new MutableBoolean();
+ queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ try {
+ job.run(container, context);
+ } finally {
+ synchronized(finished) {
+ finished.value = true;
+ finished.notifyAll();
+ }
+ }
+ }
+
+ }, priority, false);
+ synchronized(finished) {
+ while(!finished.value) {
+ try {
+ finished.wait();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+ }
}
Added: branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/node/NodeRestartJobsQueue.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -0,0 +1,96 @@
+package freenet.node;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Predicate;
+
+import freenet.client.async.DBJob;
+import freenet.support.Logger;
+
+public class NodeRestartJobsQueue {
+
+ private final long nodeDBHandle;
+
+ public NodeRestartJobsQueue(long nodeDBHandle2) {
+ nodeDBHandle = nodeDBHandle2;
+ dbJobs = new Set[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
+ for(int i=0;i<dbJobs.length;i++)
+ dbJobs[i] = new HashSet<DBJob>();
+ }
+
+ public static NodeRestartJobsQueue init(final long nodeDBHandle,
ObjectContainer container) {
+ ObjectSet<NodeRestartJobsQueue> results =
+ container.query(new Predicate<NodeRestartJobsQueue>() {
+
+ @Override
+ public boolean match(NodeRestartJobsQueue arg0) {
+ return (arg0.nodeDBHandle == nodeDBHandle);
+ }
+
+ });
+ if(results.hasNext()) {
+ NodeRestartJobsQueue queue = (NodeRestartJobsQueue)
results.next();
+ container.activate(queue, 1);
+ queue.onInit(container);
+ return queue;
+ }
+ NodeRestartJobsQueue queue = new
NodeRestartJobsQueue(nodeDBHandle);
+ container.store(queue);
+ return queue;
+ }
+
+ private void onInit(ObjectContainer container) {
+ // FIXME do something, maybe activate?
+ }
+
+ private final Set<DBJob>[] dbJobs;
+
+ public void queueRestartJob(DBJob job, int priority, ObjectContainer
container) {
+ container.activate(dbJobs[priority], 1);
+ dbJobs[priority].add(job);
+ container.store(dbJobs[priority]);
+ container.deactivate(dbJobs[priority], 1);
+ }
+
+ public void removeRestartJob(DBJob job, int priority, ObjectContainer
container) {
+ boolean jobWasActive = container.ext().isActive(job);
+ if(!jobWasActive) container.activate(job, 1);
+ container.activate(dbJobs[priority], 1);
+ if(!dbJobs[priority].remove(job)) {
+ int found = 0;
+ for(int i=0;i<dbJobs.length;i++) {
+ container.activate(dbJobs[priority], 1);
+ if(dbJobs[priority].remove(job)) {
+ container.store(dbJobs[priority]);
+ found++;
+ }
+ container.deactivate(dbJobs[priority], 1);
+ }
+ if(found > 0)
+ Logger.error(this, "Job "+job+" not in
specified priority "+priority+" found in "+found+" other priorities when
removing");
+ else
+ Logger.error(this, "Job "+job+" not found when
removing it");
+ } else {
+ container.store(dbJobs[priority]);
+ container.deactivate(dbJobs[priority], 1);
+ }
+ if(!jobWasActive) container.deactivate(job, 1);
+ }
+
+ DBJob[] getRestartDatabaseJobs(ObjectContainer container) {
+ ArrayList<DBJob> list = new ArrayList<DBJob>();
+ for(int i=0;i<dbJobs.length;i++) {
+ container.activate(dbJobs[i], 1);
+ list.addAll(dbJobs[i]);
+ dbJobs[i].clear();
+ container.store(dbJobs[i]);
+ container.deactivate(dbJobs[i], 1);
+ }
+ return list.toArray(new DBJob[list.size()]);
+ }
+
+}
Modified: branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucket.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -11,6 +11,9 @@
import com.db4o.ObjectContainer;
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
@@ -22,11 +25,20 @@
private boolean freed;
private boolean readOnly;
private final BucketFactory bf;
-
- public BucketChainBucket(long bucketSize, BucketFactory bf) {
+ private final DBJobRunner dbJobRunner;
+ private boolean stored;
+
+ /**
+ * @param bucketSize
+ * @param bf
+ * @param dbJobRunner If not null, use this to store buckets to disk
progressively
+ * to avoid a big transaction at the end. Caller then MUST call
storeTo() at some point.
+ */
+ public BucketChainBucket(long bucketSize, BucketFactory bf, DBJobRunner
dbJobRunner) {
this.bucketSize = bucketSize;
this.buckets = new Vector<Bucket>();
this.bf = bf;
+ this.dbJobRunner = dbJobRunner;
size = 0;
freed = false;
readOnly = false;
@@ -38,9 +50,19 @@
this.size = size2;
this.readOnly = readOnly;
this.bf = bf2;
+ dbJobRunner = null;
}
public void free() {
+ if(dbJobRunner != null) {
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ removeFrom(container);
+ }
+
+ }, NativeThread.HIGH_PRIORITY);
+ }
Bucket[] list;
synchronized(this) {
list = getBuckets();
@@ -272,6 +294,17 @@
};
}
+ private final DBJob killMe = new DBJob() {
+
+ public void run(ObjectContainer container, ClientContext
context) {
+ container.activate(BucketChainBucket.this, 1);
+ if(stored) return;
+ System.err.println("Freeing unfinished unstored bucket
"+this);
+ removeFrom(container);
+ }
+
+ };
+
protected OutputStream makeBucketOutputStream(int i) throws IOException
{
Bucket bucket = bf.makeBucket(bucketSize);
buckets.add(bucket);
@@ -279,6 +312,16 @@
throw new IllegalStateException("Added bucket, size
should be " + (i + 1) + " but is " + buckets.size());
if (buckets.get(i) != bucket)
throw new IllegalStateException("Bucket got replaced.
Race condition?");
+ if(dbJobRunner != null && !stored && buckets.size() % 1024 ==
0) {
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ container.store(BucketChainBucket.this);
+ dbJobRunner.queueRestartJob(killMe,
NativeThread.HIGH_PRIORITY, container);
+ }
+
+ }, NativeThread.HIGH_PRIORITY);
+ }
return bucket.getOutputStream();
}
@@ -297,8 +340,10 @@
public void storeTo(ObjectContainer container) {
for(int i=0;i<buckets.size();i++)
((Bucket) buckets.get(i)).storeTo(container);
+ stored = true;
container.store(buckets);
container.store(this);
+ dbJobRunner.removeRestartJob(killMe,
NativeThread.HIGH_PRIORITY, container);
}
public void removeFrom(ObjectContainer container) {
@@ -311,6 +356,8 @@
list[i].removeFrom(container);
container.delete(buckets);
container.delete(this);
+ stored = false;
+ dbJobRunner.removeRestartJob(killMe,
NativeThread.HIGH_PRIORITY, container);
}
public Bucket createShadow() throws IOException {
Modified:
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
2008-10-13 21:11:15 UTC (rev 22984)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
2008-10-14 17:42:02 UTC (rev 22985)
@@ -2,6 +2,7 @@
import java.io.IOException;
+import freenet.client.async.DBJobRunner;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
@@ -9,14 +10,24 @@
final BucketFactory factory;
final int blockSize;
+ final DBJobRunner runner;
- public BucketChainBucketFactory(BucketFactory bucketFactory, int
block_size) {
+ /**
+ * If you want persistent buckets which will be saved every 1000
buckets, and
+ * deleted on restart if not stored by then, then pass in the
DBJobRunner.
+ * Otherwise pass in null.
+ * @param bucketFactory
+ * @param block_size
+ * @param runner
+ */
+ public BucketChainBucketFactory(BucketFactory bucketFactory, int
block_size, DBJobRunner runner) {
this.factory = bucketFactory;
this.blockSize = block_size;
+ this.runner = runner;
}
public Bucket makeBucket(long size) throws IOException {
- return new BucketChainBucket(blockSize, factory);
+ return new BucketChainBucket(blockSize, factory, runner);
}
}