Author: nextgens
Date: 2008-10-28 18:09:53 +0000 (Tue, 28 Oct 2008)
New Revision: 23155
Modified:
trunk/freenet/src/freenet/client/async/SingleFileInserter.java
trunk/freenet/src/freenet/support/compress/Compressor.java
Log:
Use java.utils.concurrent.Semaphore to serialize compression-related operations
(not tested)
Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2008-10-28 18:09:24 UTC (rev 23154)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2008-10-28 18:09:53 UTC (rev 23155)
@@ -26,6 +26,7 @@
import freenet.support.io.BucketChainBucketFactory;
import freenet.support.io.BucketTools;
import freenet.support.io.NativeThread;
+import java.util.concurrent.Semaphore;
/**
* Attempt to insert a file. May include metadata.
@@ -108,10 +109,6 @@
OffThreadCompressor otc = new OffThreadCompressor();
ctx.executor.execute(otc, "Compressor for " + this);
}
-
- // Use a mutex to serialize compression (limit memory usage/IO)
- // Of course it doesn't make any sense on multi-core systems.
- private static final Object compressorSync = new Object();
private class OffThreadCompressor implements PrioRunnable {
public void run() {
@@ -140,7 +137,14 @@
}
private void tryCompress() throws InsertException {
- synchronized(compressorSync) {
+ try {
+ try {
+ COMPRESSOR_TYPE.compressorSemaphore.acquire();
+ } catch (InterruptedException e) {
+ // should not happen
+ Logger.error(this, "Caught an
InterruptedException:"+e.getMessage(), e);
+ throw new
InsertException(InsertException.INTERNAL_ERROR, e, null);
+ }
// First, determine how small it needs to be
Bucket origData = block.getData();
Bucket data = origData;
@@ -302,6 +306,8 @@
sfi.start();
if(earlyEncode) sfi.forceEncode();
}
+ } finally {
+ COMPRESSOR_TYPE.compressorSemaphore.release();
}
}
Modified: trunk/freenet/src/freenet/support/compress/Compressor.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/Compressor.java 2008-10-28
18:09:24 UTC (rev 23154)
+++ trunk/freenet/src/freenet/support/compress/Compressor.java 2008-10-28
18:09:53 UTC (rev 23155)
@@ -3,10 +3,13 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.support.compress;
+import freenet.support.Logger;
import java.io.IOException;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
+import freenet.support.io.NativeThread;
+import java.util.concurrent.Semaphore;
/**
* A data compressor. Contains methods to get all data compressors.
@@ -49,6 +52,33 @@
public int decompress(byte[] dbuf, int i, int j, byte[] output)
throws CompressionOutputSizeException {
return compressor.decompress(dbuf, i, j, output);
}
+
+
+ public static final Semaphore compressorSemaphore = new
Semaphore(getMaxRunningCompressionThreads());
+
+ private static int getMaxRunningCompressionThreads() {
+ int maxRunningThreads = 1;
+
+ String osName = System.getProperty("os.name");
+ if(osName.indexOf("Windows") == -1 &&
(osName.toLowerCase().indexOf("mac os x") > 0) ||
(!NativeThread.usingNativeCode()))
+ // OS/X niceness is really weak, so we don't
want any more background CPU load than necessary
+ // Also, on non-Windows, we need the native
threads library to be working.
+ maxRunningThreads = 1;
+ else {
+ // Most other OSs will have reasonable
niceness, so go by RAM.
+ Runtime r = Runtime.getRuntime();
+ int max = r.availableProcessors(); // FIXME
this may change in a VM, poll it
+ long maxMemory = r.maxMemory();
+ if(maxMemory < 128 * 1024 * 1024)
+ max = 1;
+ else
+ // one compressor thread per (128MB of
ram + available core)
+ max = Math.min(max, (int)
(Math.min(Integer.MAX_VALUE, maxMemory / (128 * 1024 * 1024))));
+ maxRunningThreads = max;
+ }
+ Logger.minor(COMPRESSOR_TYPE.class, "Maximum Compressor
threads: " + maxRunningThreads);
+ return maxRunningThreads;
+ }
}
public abstract Bucket compress(Bucket data, BucketFactory bf, long
maxLength) throws IOException, CompressionOutputSizeException;