Author: nextgens
Date: 2008-10-28 18:09:53 +0000 (Tue, 28 Oct 2008)
New Revision: 23155

Modified:
   trunk/freenet/src/freenet/client/async/SingleFileInserter.java
   trunk/freenet/src/freenet/support/compress/Compressor.java
Log:
Use java.utils.concurrent.Semaphore to serialize compression-related operations 
(not tested)

Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-10-28 18:09:24 UTC (rev 23154)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-10-28 18:09:53 UTC (rev 23155)
@@ -26,6 +26,7 @@
 import freenet.support.io.BucketChainBucketFactory;
 import freenet.support.io.BucketTools;
 import freenet.support.io.NativeThread;
+import java.util.concurrent.Semaphore;

 /**
  * Attempt to insert a file. May include metadata.
@@ -108,10 +109,6 @@
                OffThreadCompressor otc = new OffThreadCompressor();
                ctx.executor.execute(otc, "Compressor for " + this);
        }
-
-       // Use a mutex to serialize compression (limit memory usage/IO)
-       // Of course it doesn't make any sense on multi-core systems.
-       private static final Object compressorSync = new Object();

        private  class OffThreadCompressor implements PrioRunnable {
                public void run() {
@@ -140,7 +137,14 @@
        }

        private void tryCompress() throws InsertException {
-               synchronized(compressorSync) {
+               try {
+                       try {
+                               COMPRESSOR_TYPE.compressorSemaphore.acquire();
+                       } catch (InterruptedException e) {
+                               // should not happen
+                               Logger.error(this, "Caught an 
InterruptedException:"+e.getMessage(), e);
+                               throw new 
InsertException(InsertException.INTERNAL_ERROR, e, null);
+                       }
                // First, determine how small it needs to be
                Bucket origData = block.getData();
                Bucket data = origData;
@@ -302,6 +306,8 @@
                        sfi.start();
                        if(earlyEncode) sfi.forceEncode();
                }
+               } finally {
+               COMPRESSOR_TYPE.compressorSemaphore.release();
                }
        }


Modified: trunk/freenet/src/freenet/support/compress/Compressor.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/Compressor.java  2008-10-28 
18:09:24 UTC (rev 23154)
+++ trunk/freenet/src/freenet/support/compress/Compressor.java  2008-10-28 
18:09:53 UTC (rev 23155)
@@ -3,10 +3,13 @@
 * http://www.gnu.org/ for further details of the GPL. */
 package freenet.support.compress;

+import freenet.support.Logger;
 import java.io.IOException;

 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;
+import freenet.support.io.NativeThread;
+import java.util.concurrent.Semaphore;

 /**
  * A data compressor. Contains methods to get all data compressors.
@@ -49,6 +52,33 @@
                public int decompress(byte[] dbuf, int i, int j, byte[] output) 
throws CompressionOutputSizeException {
                        return compressor.decompress(dbuf, i, j, output);
                }
+               
+               
+               public static final Semaphore compressorSemaphore = new 
Semaphore(getMaxRunningCompressionThreads());
+               
+               private static int getMaxRunningCompressionThreads() {
+                       int maxRunningThreads = 1;
+                       
+                       String osName = System.getProperty("os.name");
+                       if(osName.indexOf("Windows") == -1 && 
(osName.toLowerCase().indexOf("mac os x") > 0) || 
(!NativeThread.usingNativeCode()))
+                               // OS/X niceness is really weak, so we don't 
want any more background CPU load than necessary
+                               // Also, on non-Windows, we need the native 
threads library to be working.
+                               maxRunningThreads = 1;
+                       else {
+                               // Most other OSs will have reasonable 
niceness, so go by RAM.
+                               Runtime r = Runtime.getRuntime();
+                               int max = r.availableProcessors(); // FIXME 
this may change in a VM, poll it
+                               long maxMemory = r.maxMemory();
+                               if(maxMemory < 128 * 1024 * 1024)
+                                       max = 1;
+                               else
+                                       // one compressor thread per (128MB of 
ram + available core)
+                                       max = Math.min(max, (int) 
(Math.min(Integer.MAX_VALUE, maxMemory / (128 * 1024 * 1024))));
+                               maxRunningThreads = max;
+                       }
+                       Logger.minor(COMPRESSOR_TYPE.class, "Maximum Compressor 
threads: " + maxRunningThreads);
+                       return maxRunningThreads;
+               }
        }

        public abstract Bucket compress(Bucket data, BucketFactory bf, long 
maxLength) throws IOException, CompressionOutputSizeException;


Reply via email to