Author: toad
Date: 2008-06-25 15:22:37 +0000 (Wed, 25 Jun 2008)
New Revision: 20691
Modified:
branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
Log:
Activation.
Don't schedule a compression job twice. With lazy evaluation and the activation
changes, this can happen. :|
Call onStartCompression() on the database thread, and activate the inserter
first.
Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-06-25 15:20:27 UTC (rev 20690)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-06-25 15:22:37 UTC (rev 20691)
@@ -8,6 +8,7 @@
import freenet.client.InsertException;
import freenet.keys.NodeCHK;
+import freenet.support.Logger;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
import freenet.support.compress.CompressionOutputSizeException;
@@ -38,6 +39,7 @@
/** BucketFactory */
public final BucketFactory bucketFactory;
public final boolean persistent;
+ private transient boolean scheduled;
public InsertCompressor(long nodeDBHandle2, SingleFileInserter
inserter2, Bucket origData2, int minSize2, BucketFactory bf, boolean
persistent) {
this.nodeDBHandle = nodeDBHandle2;
@@ -48,7 +50,21 @@
this.persistent = persistent;
}
- public void init(final ClientContext ctx) {
+ public void init(ObjectContainer container, final ClientContext ctx) {
+ if(persistent) {
+ container.activate(inserter, 1);
+ container.activate(origData, 1);
+ }
+ synchronized(this) {
+ // Can happen with the above activation and lazy query
evaluation.
+ if(scheduled) {
+ Logger.error(this, "Already scheduled
compression, not rescheduling");
+ return;
+ }
+ scheduled = true;
+ }
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Compressing "+this+" :
origData.size="+origData.size()+" for "+inserter);
ctx.mainExecutor.execute(new Runnable() {
public void run() {
@@ -70,7 +86,18 @@
try {
for(int i=0;i<algos;i++) {
// Only produce if we are compressing *the
original data*
- inserter.onStartCompression(i, null, context);
+ if(persistent) {
+ final int phase = i;
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+
container.activate(inserter, 1);
+
inserter.onStartCompression(phase, container, context);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+ }
+
Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
Bucket result;
result = comp.compress(origData, new
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE), origData.size());
@@ -154,7 +181,7 @@
InsertCompressor compressor = new
InsertCompressor(context.nodeDBHandle, inserter, origData, minSize, bf,
persistent);
if(persistent)
container.set(compressor);
- compressor.init(context);
+ compressor.init(container, context);
return compressor;
}
@@ -168,7 +195,7 @@
});
while(results.hasNext()) {
InsertCompressor comp = (InsertCompressor)
results.next();
- comp.init(context);
+ comp.init(container, context);
}
}