On Wed, Oct 29, 2008 at 2:09 AM,  <nextgens at freenetproject.org> wrote:
> Author: nextgens
> Date: 2008-10-28 18:09:53 +0000 (Tue, 28 Oct 2008)
> New Revision: 23155
>
> Modified:
>   trunk/freenet/src/freenet/client/async/SingleFileInserter.java
>   trunk/freenet/src/freenet/support/compress/Compressor.java
> Log:
> Use java.utils.concurrent.Semaphore to serialize compression-related 
> operations (not tested)
>
> Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
> 2008-10-28 18:09:24 UTC (rev 23154)
> +++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
> 2008-10-28 18:09:53 UTC (rev 23155)
> @@ -26,6 +26,7 @@
>  import freenet.support.io.BucketChainBucketFactory;
>  import freenet.support.io.BucketTools;
>  import freenet.support.io.NativeThread;
> +import java.util.concurrent.Semaphore;
>
>  /**
>  * Attempt to insert a file. May include metadata.
> @@ -108,10 +109,6 @@
>                OffThreadCompressor otc = new OffThreadCompressor();
>                ctx.executor.execute(otc, "Compressor for " + this);
>        }
> -
> -       // Use a mutex to serialize compression (limit memory usage/IO)
> -       // Of course it doesn't make any sense on multi-core systems.
> -       private static final Object compressorSync = new Object();
>
>        private  class OffThreadCompressor implements PrioRunnable {
>                public void run() {
> @@ -140,7 +137,14 @@
>        }
>
>        private void tryCompress() throws InsertException {
> -               synchronized(compressorSync) {
> +               try {
> +                       try {
> +                               COMPRESSOR_TYPE.compressorSemaphore.acquire();
> +                       } catch (InterruptedException e) {
> +                               // should not happen
> +                               Logger.error(this, "Caught an 
> InterruptedException:"+e.getMessage(), e);
> +                               throw new 
> InsertException(InsertException.INTERNAL_ERROR, e, null);
> +                       }
>                // First, determine how small it needs to be
>                Bucket origData = block.getData();
>                Bucket data = origData;
> @@ -302,6 +306,8 @@
>                        sfi.start();
>                        if(earlyEncode) sfi.forceEncode();
>                }
> +               } finally {
> +               COMPRESSOR_TYPE.compressorSemaphore.release();

the semaphore should go into Compressor, not Compressor.COMPRESSOR_TYPE

>                }
>        }
>
>
> Modified: trunk/freenet/src/freenet/support/compress/Compressor.java
> ===================================================================
> --- trunk/freenet/src/freenet/support/compress/Compressor.java  2008-10-28 
> 18:09:24 UTC (rev 23154)
> +++ trunk/freenet/src/freenet/support/compress/Compressor.java  2008-10-28 
> 18:09:53 UTC (rev 23155)
> @@ -3,10 +3,13 @@
>  * http://www.gnu.org/ for further details of the GPL. */
>  package freenet.support.compress;
>
> +import freenet.support.Logger;
>  import java.io.IOException;
>
>  import freenet.support.api.Bucket;
>  import freenet.support.api.BucketFactory;
> +import freenet.support.io.NativeThread;
> +import java.util.concurrent.Semaphore;
>
>  /**
>  * A data compressor. Contains methods to get all data compressors.
> @@ -49,6 +52,33 @@
>                public int decompress(byte[] dbuf, int i, int j, byte[] 
> output) throws CompressionOutputSizeException {
>                        return compressor.decompress(dbuf, i, j, output);
>                }
> +
> +
> +               public static final Semaphore compressorSemaphore = new 
> Semaphore(getMaxRunningCompressionThreads());
> +
> +               private static int getMaxRunningCompressionThreads() {
> +                       int maxRunningThreads = 1;
> +
> +                       String osName = System.getProperty("os.name");
> +                       if(osName.indexOf("Windows") == -1 && 
> (osName.toLowerCase().indexOf("mac os x") > 0) || 
> (!NativeThread.usingNativeCode()))
> +                               // OS/X niceness is really weak, so we don't 
> want any more background CPU load than necessary
> +                               // Also, on non-Windows, we need the native 
> threads library to be working.
> +                               maxRunningThreads = 1;
> +                       else {
> +                               // Most other OSs will have reasonable 
> niceness, so go by RAM.
> +                               Runtime r = Runtime.getRuntime();
> +                               int max = r.availableProcessors(); // FIXME 
> this may change in a VM, poll it
> +                               long maxMemory = r.maxMemory();
> +                               if(maxMemory < 128 * 1024 * 1024)
> +                                       max = 1;
> +                               else
> +                                       // one compressor thread per (128MB 
> of ram + available core)
> +                                       max = Math.min(max, (int) 
> (Math.min(Integer.MAX_VALUE, maxMemory / (128 * 1024 * 1024))));
> +                               maxRunningThreads = max;
> +                       }
> +                       Logger.minor(COMPRESSOR_TYPE.class, "Maximum 
> Compressor threads: " + maxRunningThreads);
> +                       return maxRunningThreads;
> +               }
>        }
>
>        public abstract Bucket compress(Bucket data, BucketFactory bf, long 
> maxLength) throws IOException, CompressionOutputSizeException;
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>

Reply via email to