Author: nextgens
Date: 2008-08-25 13:46:46 +0000 (Mon, 25 Aug 2008)
New Revision: 22133

Modified:
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/support/io/ArrayBucket.java
   trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
   trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
   trunk/freenet/src/freenet/support/io/TempBucketFactory.java
   trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
   trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
Log:
TempBucketFactory: it is *very* unlikely that it won't break things! If it 
doesn't it should speed things up significantly and solve the OOM problems a 
few users have been reporting.

Why isn't PaddedEphemerallyEncryptedBucket not properly synchronized?

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2008-08-25 01:29:48 UTC 
(rev 22132)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2008-08-25 13:46:46 UTC 
(rev 22133)
@@ -154,6 +154,8 @@
                                        // FIXME
                                        throw new 
InvalidConfigValueException(l10n("movingTempDirOnTheFlyNotSupported"));
                                }
+                               
+                               @Override
                                public boolean isReadOnly() {
                                        return true;
                                }
@@ -199,6 +201,8 @@
                                        // FIXME
                                        throw new 
InvalidConfigValueException("Moving persistent temp directory on the fly not 
supported at present");
                                }
+                               
+                               @Override
                                public boolean isReadOnly() {
                                        return true;
                                }
@@ -210,7 +214,7 @@
                        throw new 
NodeInitException(NodeInitException.EXIT_BAD_TEMP_DIR, msg);
                }

-               nodeConfig.register("maxRAMBucketSize", "32KiB", sortOrder++, 
true, false, "NodeClientCore.maxRAMBucketSize", 
"NodeClientCore.maxRAMBucketSizeLong", new LongCallback() {
+               nodeConfig.register("maxRAMBucketSize", "128KiB", sortOrder++, 
true, false, "NodeClientCore.maxRAMBucketSize", 
"NodeClientCore.maxRAMBucketSizeLong", new LongCallback() {

                        public Long get() {
                                return (tempBucketFactory == null ? 0 : 
tempBucketFactory.getMaxRAMBucketSize());
@@ -247,7 +251,7 @@
                                tempBucketFactory.setEncryption(val);
                        }
                });
-               tempBucketFactory = new 
TempBucketFactory(tempFilenameGenerator, 
nodeConfig.getLong("maxRAMBucketSize"), 
nodeConfig.getLong("RAMBucketPoolSize"), random, node.fastWeakRandom, 
nodeConfig.getBoolean("encryptTempBuckets"));
+               tempBucketFactory = new TempBucketFactory(node.executor, 
tempFilenameGenerator, nodeConfig.getLong("maxRAMBucketSize"), 
nodeConfig.getLong("RAMBucketPoolSize"), random, node.fastWeakRandom, 
nodeConfig.getBoolean("encryptTempBuckets"));

                // Downloads directory


Modified: trunk/freenet/src/freenet/support/io/ArrayBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/ArrayBucket.java       2008-08-25 
01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/ArrayBucket.java       2008-08-25 
13:46:46 UTC (rev 22133)
@@ -1,6 +1,5 @@
 package freenet.support.io;

-import freenet.support.Logger;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -20,32 +19,33 @@
        private final String name;
        private volatile boolean readOnly;

-       /** The maximum size of the bucket; -1 means no maxSize */
-       private final long maxSize;
        private long size;

-       private static boolean logDEBUG = false;
-       
-       public ArrayBucket(long maxSize) {
-               this("ArrayBucket", maxSize);
+       /** Create a new immutable ArrayBucket with the data provided and a 
name*/
+       public ArrayBucket(String name, byte[] initdata) {
+               this(name);
+               data.add(initdata);
+               this.size = initdata.length;
+               setReadOnly();
        }

-       public ArrayBucket(byte[] finalData) {
-               this(finalData, finalData.length);
+       /** Create a new immutable ArrayBucket with the data provided */        
+       public ArrayBucket(byte[] initdata) {
+               this();
+               data.add(initdata);
+               this.size = initdata.length;
                setReadOnly();
        }

-       public ArrayBucket(byte[] initdata, long maxSize) {
-               this("ArrayBucket", maxSize);
-               data.add(initdata);
+       /** Create a new array bucket */
+       public ArrayBucket() {
+               this("ArrayBucket");
        }

-       ArrayBucket(String name, long maxSize) {
+       /** Create a new array bucket with the provided name */
+       public ArrayBucket(String name) {
                data = new ArrayList<byte[]>();
                this.name = name;
-               this.maxSize = maxSize;
-               if(logDEBUG && maxSize < 0)
-                       Logger.minor(this, "Has been called with maxSize<0 !", 
new NullPointerException());
        }

        public synchronized OutputStream getOutputStream() throws IOException {
@@ -86,7 +86,7 @@
        }

        private class ArrayBucketOutputStream extends ByteArrayOutputStream {
-               boolean hasBeenClosed = false;
+               private boolean hasBeenClosed = false;

                public ArrayBucketOutputStream() {
                        super();
@@ -95,23 +95,17 @@
                @Override
                public synchronized void write(byte b[], int off, int len) {
                        if(readOnly) throw new IllegalStateException("Read 
only");
-                       long sizeIfWritten = size + len;
-                       if(logDEBUG && maxSize > -1 && maxSize < sizeIfWritten) 
// FIXME: should be IOE but how to do it?
-                               throw new IllegalArgumentException("The maxSize 
of the bucket is "+maxSize+
-                                       " and writing "+len+ " bytes to it 
would make it oversize!");
+                       if(hasBeenClosed) throw new IllegalStateException("Has 
been closed!");
                        super.write(b, off, len);
-                       size = sizeIfWritten;
+                       size +=len;
                }

                @Override
                public synchronized void write(int b) {
                        if(readOnly) throw new IllegalStateException("Read 
only");
-                       long sizeIfWritten = size + 1;
-                       if(logDEBUG && maxSize > -1 && maxSize < sizeIfWritten) 
// FIXME: should be IOE but how to do it?
-                               throw new IllegalArgumentException("The maxSize 
of the bucket is "+maxSize+
-                                       " and writing 1 byte to it would make 
it oversize!");
+                       if(hasBeenClosed) throw new IllegalStateException("Has 
been closed!");
                        super.write(b);
-                       size = sizeIfWritten;
+                       size++;
                }

                @Override
@@ -125,21 +119,23 @@

        private class ArrayBucketInputStream extends InputStream {

-               private Iterator i;
+               private final Iterator<byte[]> i;
                private ByteArrayInputStream in;
+               private boolean hasBeenClosed = false;

                public ArrayBucketInputStream() {
                        i = data.iterator();
                }

-               public synchronized int read() {
+               public synchronized int read() throws IOException {
                        return priv_read();
                }

-               private synchronized int priv_read() {
+               private synchronized int priv_read() throws IOException {
+                       if(hasBeenClosed) throw new IOException("Has been 
closed!");
                        if (in == null) {
                                if (i.hasNext()) {
-                                       in = new ByteArrayInputStream((byte[]) 
i.next());
+                                       in = new ByteArrayInputStream(i.next());
                                } else {
                                        return -1;
                                }
@@ -154,19 +150,20 @@
                }

                @Override
-               public synchronized int read(byte[] b) {
+               public synchronized int read(byte[] b) throws IOException {
                        return priv_read(b, 0, b.length);
                }

                @Override
-               public synchronized int read(byte[] b, int off, int len) {
+               public synchronized int read(byte[] b, int off, int len) throws 
IOException {
                        return priv_read(b, off, len);
                }

-               private synchronized int priv_read(byte[] b, int off, int len) {
+               private synchronized int priv_read(byte[] b, int off, int len) 
throws IOException {
+                       if(hasBeenClosed) throw new IOException("Has been 
closed!");
                        if (in == null) {
                                if (i.hasNext()) {
-                                       in = new ByteArrayInputStream((byte[]) 
i.next());
+                                       in = new ByteArrayInputStream(i.next());
                                } else {
                                        return -1;
                                }
@@ -181,10 +178,11 @@
                }

                @Override
-               public synchronized int available() {
+               public synchronized int available() throws IOException {
+                       if(hasBeenClosed) throw new IOException("Has been 
closed!");
                        if (in == null) {
                                if (i.hasNext()) {
-                                       in = new ByteArrayInputStream((byte[]) 
i.next());
+                                       in = new ByteArrayInputStream(i.next());
                                } else {
                                        return 0;
                                }
@@ -192,6 +190,12 @@
                        return in.available();
                }

+               @Override
+               public synchronized void close() throws IOException {
+                       if(hasBeenClosed) return;
+                       hasBeenClosed = true;
+                       Closer.close(in);
+               }
        }

        public boolean isReadOnly() {
@@ -214,8 +218,7 @@
                int bufSize = (int)sz;
                byte[] buf = new byte[bufSize];
                int index = 0;
-               for(Iterator i=data.iterator();i.hasNext();) {
-                       byte[] obuf = (byte[]) i.next();
+               for(byte[] obuf : data) {
                        System.arraycopy(obuf, 0, buf, index, obuf.length);
                        index += obuf.length;
                }

Modified: trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java        
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java        
2008-08-25 13:46:46 UTC (rev 22133)
@@ -11,7 +11,7 @@
 public class ArrayBucketFactory implements BucketFactory {

        public Bucket makeBucket(long size) throws IOException {
-               return new ArrayBucket(size);
+               return new ArrayBucket();
        }

        public void freeBucket(Bucket b) throws IOException {

Modified: 
trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java  
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java  
2008-08-25 13:46:46 UTC (rev 22133)
@@ -151,6 +151,7 @@
                        }
                }

+               @Override
                public void write(byte[] buf, int offset, int length) throws 
IOException {
                        if(closed) throw new IOException("Already closed!");
                        if(streamNumber != lastOutputStream)
@@ -165,14 +166,7 @@
                        }
                }

-               // Override this or FOS will use write(int)
-               public void write(byte[] buf) throws IOException {
-                       if(closed) throw new IOException("Already closed!");
-                       if(streamNumber != lastOutputStream)
-                               throw new IllegalStateException("Writing to old 
stream in "+getName());
-                       write(buf, 0, buf.length);
-               }
-               
+               @Override
                public void close() throws IOException {
                        if(closed) return;
                        try {
@@ -225,11 +219,13 @@
                        return pcfb.decipher(x);
                }

+               @Override
                public final int available() {
                        int x = (int)Math.min(dataLength - ptr, 
Integer.MAX_VALUE);
                        return (x < 0) ? 0 : x;
                }

+               @Override
                public int read(byte[] buf, int offset, int length) throws 
IOException {
                        // FIXME remove debugging
                        if((length+offset > buf.length) || (offset < 0) || 
(length < 0))
@@ -244,10 +240,12 @@
                        return readBytes;
                }

+               @Override
                public int read(byte[] buf) throws IOException {
                        return read(buf, 0, buf.length);
                }

+               @Override
                public long skip(long bytes) throws IOException {
                        byte[] buf = new byte[(int)Math.min(4096, bytes)];
                        long skipped = 0;
@@ -259,6 +257,7 @@
                        return skipped;
                }

+               @Override
                public void close() throws IOException {
                        in.close();
                }
@@ -308,6 +307,7 @@
                return "Encrypted:"+bucket.getName();
        }

+       @Override
        public String toString() {
                return super.toString()+ ':' +bucket.toString();
        }

Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
13:46:46 UTC (rev 22133)
@@ -4,6 +4,10 @@
 package freenet.support.io;

 import freenet.crypt.RandomSource;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.SizeUtil;
+import freenet.support.TimeUtil;
 import java.io.IOException;

 import freenet.support.api.Bucket;
@@ -11,101 +15,259 @@

 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;

 /**
  * Temporary Bucket Factory
+ * 
+ * Buckets created by this factory can be either:
+ *     - ArrayBuckets
+ * OR
+ *     - FileBuckets
+ * 
+ * ArrayBuckets are used if and only if:
+ *     1) there is enough room remaining on the pool (@see maxRamUsed and @see 
bytesInUse)
+ *     2) the initial size is smaller than (@maxRAMBucketSize)
+ * 
+ * Depending on how they are used they might switch from one type to another 
transparently.
+ * 
+ * Currently they are two factors considered for a migration:
+ *     - if they are long-lived or not (@see RAMBUCKET_MAX_AGE)
+ *     - if their size is over RAMBUCKET_CONVERSION_FACTOR*maxRAMBucketSize
  */
 public class TempBucketFactory implements BucketFactory {
+       public final static long defaultIncrement = 4096;
+       public final static float DEFAULT_FACTOR = 1.25F;
+       
+       private final FilenameGenerator filenameGenerator;
+       private long bytesInUse = 0;
+       private final RandomSource strongPRNG;
+       private final Random weakPRNG;
+       private final Executor executor;
+       private volatile boolean logMINOR;
+       private volatile boolean reallyEncrypt;
+       
+       /** How big can the defaultSize be for us to consider using RAMBuckets? 
*/
+       private long maxRAMBucketSize;
+       /** How much memory do we dedicate to the RAMBucketPool? (in bytes) */
+       private long maxRamUsed;
+       
+       /** How old is a long-lived RAMBucket? */
+       private final int RAMBUCKET_MAX_AGE = 5*60*1000; // 5mins
+       /** How many times the maxRAMBucketSize can a RAMBucket be before it 
gets migrated? */
+       private final int RAMBUCKET_CONVERSION_FACTOR = 4;
+       
        public class TempBucket implements Bucket {
                private Bucket currentBucket;
+               private long currentSize;
+               private volatile boolean shouldResetOS = false;
+               private volatile boolean shouldResetIS = false;
+               public final long creationTime;

-               public TempBucket(Bucket cur) {
+               public TempBucket(long now, Bucket cur) {
+                       if(cur == null)
+                               throw new NullPointerException();
                        this.currentBucket = cur;
+                       this.creationTime = now;
                }

-               public final void migrateToFileBucket() throws IOException {
-                       RAMBucket ramBucket = null;
-                       synchronized(this) {
+               /** A blocking method to force-migrate from a RAMBucket to a 
FileBucket */
+               private final void migrateToFileBucket() throws IOException {
+                       Bucket toMigrate = null;
+                       synchronized(currentBucket) {
                                if(!isRAMBucket())
+                                       // Nothing to migrate! We don't want to 
switch back to ram, do we?                                      
                                        return;

-                               ramBucket = (RAMBucket) currentBucket;
-                               TempFileBucket tempFB = new 
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator);
+                               toMigrate = currentBucket;
+                               Bucket tempFB = _makeFileBucket();
                                BucketTools.copy(currentBucket, tempFB);
                                currentBucket = tempFB;
+                               // We need streams to be reset to point to the 
new bucket
+                               shouldResetOS = true;
+                               shouldResetIS = true;
                        }
-                       ramBucket.free();
+                       if(logMINOR)
+                               Logger.minor(this, "We have migrated 
"+toMigrate.hashCode());
+                       
+                       // Might have changed already so we can't rely on 
currentSize!
+                       _hasFreed(toMigrate.size());
+                       // We can free it on-thread as it's a rambucket
+                       toMigrate.free();
                }

-               public final synchronized boolean isRAMBucket() {
-                       return (currentBucket instanceof RAMBucket);
+               public final boolean isRAMBucket() {
+                       synchronized(currentBucket) {
+                               return (currentBucket instanceof ArrayBucket);
+                       }
                }

-               public synchronized OutputStream getOutputStream() throws 
IOException {
-                       return currentBucket.getOutputStream();
+               public OutputStream getOutputStream() throws IOException {
+                       synchronized(currentBucket) {
+                               shouldResetOS = true;
+                               return new TempBucketOutputStream();
+                       }
                }

-               public synchronized InputStream getInputStream() throws 
IOException {
-                       return currentBucket.getInputStream();
+               private class TempBucketOutputStream extends OutputStream {
+                       private OutputStream os;
+                       
+                       private void _maybeMigrateRamBucket(long futureSize) 
throws IOException {
+                               if(isRAMBucket()) {
+                                       boolean shouldMigrate = false;
+                                       boolean isOversized = false;
+                                       
+                                       if(futureSize > maxRAMBucketSize * 
RAMBUCKET_CONVERSION_FACTOR) {
+                                               isOversized = true;
+                                               shouldMigrate = true;
+                                       } else if (futureSize + currentSize > 
maxRamUsed)
+                                               shouldMigrate = true;
+                                       
+                                       if(shouldMigrate) {
+                                               os.close();
+                                               if(logMINOR) {
+                                                       if(isOversized)
+                                                               
Logger.minor(this, "The bucket is over 
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will 
force-migrate it to disk.");
+                                                       else
+                                                               
Logger.minor(this, "The bucketpool is full: force-migrate before we go over the 
limit");
+                                               }
+                                               migrateToFileBucket();
+                                       }
+                               }
+                       }
+                       
+                       private void _maybeResetOutputStream() throws 
IOException {
+                               if(shouldResetOS) {
+                                       Closer.close(os);
+                                       os = currentBucket.getOutputStream();
+                                       shouldResetOS = false;
+                               }
+                       }
+                       
+                       @Override
+                       public void write(int b) throws IOException {
+                               synchronized(currentBucket) {
+                                       long futurSize = currentSize + 1;
+                                       _maybeMigrateRamBucket(futurSize);
+                                       _maybeResetOutputStream();
+                                       os.write(b);
+                                       currentSize = futurSize;
+                                       if(isRAMBucket()) // We need to 
re-check because it might have changed!
+                                               _hasTaken(1);
+                               }
+                       }
+                       
+                       @Override
+                       public void write(byte b[], int off, int len) throws 
IOException {
+                               synchronized(currentBucket) {
+                                       long futurSize = currentSize + len;
+                                       _maybeMigrateRamBucket(futurSize);
+                                       _maybeResetOutputStream();
+                                       os.write(b, off, len);
+                                       currentSize = futurSize;
+                                       if(isRAMBucket()) // We need to 
re-check because it might have changed!
+                                               _hasTaken(len);
+                               }
+                       }
+                       
+                       @Override
+                       public void flush() throws IOException {
+                               synchronized(currentBucket) {
+                                       _maybeMigrateRamBucket(currentSize);
+                                       _maybeResetOutputStream();
+                                       os.flush();
+                               }
+                       }
+                       
+                       @Override
+                       public void close() throws IOException {
+                               synchronized(currentBucket) {
+                                       _maybeMigrateRamBucket(currentSize);
+                                       _maybeResetOutputStream();
+                                       os.close();
+                               }
+                       }
                }

-               public synchronized String getName() {
-                       return currentBucket.getName();
+               public synchronized InputStream getInputStream() throws 
IOException {
+                       shouldResetIS = true;
+                       return new TempBucketInputStream();
                }
+               
+               private class TempBucketInputStream extends InputStream {
+                       private InputStream is;
+                       
+                       private void _maybeResetInputStream() throws 
IOException {
+                               if(shouldResetIS) {
+                                       Closer.close(is);
+                                       is = currentBucket.getInputStream();
+                                       shouldResetIS = false;
+                               }
+                       }
+                       
+                       @Override
+                       public int read() throws IOException {
+                               synchronized(currentBucket) {
+                                       _maybeResetInputStream();
+                                       return is.read();
+                               }
+                       }
+                       
+                       @Override
+                       public void close() throws IOException {
+                               synchronized(currentBucket) {
+                                       _maybeResetInputStream();
+                                       is.close();
+                               }
+                       }
+               }

-               public synchronized long size() {
-                       return currentBucket.size();
+               public String getName() {
+                       synchronized(currentBucket) {
+                               return currentBucket.getName();
+                       }
                }

-               public synchronized boolean isReadOnly() {
-                       return currentBucket.isReadOnly();
+               public long size() {
+                       synchronized(currentBucket) {
+                               return currentBucket.size();
+                       }
                }

-               public synchronized void setReadOnly() {
-                       currentBucket.setReadOnly();
+               public boolean isReadOnly() {
+                       synchronized(currentBucket) {
+                               return currentBucket.isReadOnly();
+                       }
                }

-               public synchronized void free() {
-                       currentBucket.free();
+               public void setReadOnly() {
+                       synchronized(currentBucket) {
+                               currentBucket.setReadOnly();
+                       }
                }
-       }

-       private class RAMBucket extends ArrayBucket {
-               public RAMBucket(long size) {
-                       super("RAMBucket", size);
-                       _hasTaken(size);
-               }
-               
-               @Override
                public void free() {
-                       super.free();
-                       _hasFreed(size());
+                       synchronized(currentBucket) {
+                               if(isRAMBucket())
+                                       _hasFreed(currentSize);
+                               currentBucket.free();
+                       }
                }
        }

-       private final FilenameGenerator filenameGenerator;
-       private long bytesInUse = 0;
-       
-       public final static long defaultIncrement = 4096;
-       
-       public final static float DEFAULT_FACTOR = 1.25F;
-       
-       public long maxRAMBucketSize;
-       public long maxRamUsed;
-
-       private final RandomSource strongPRNG;
-       private final Random weakPRNG;
-       private volatile boolean reallyEncrypt;
-
        // Storage accounting disabled by default.
-       public TempBucketFactory(FilenameGenerator filenameGenerator, long 
maxBucketSizeKeptInRam, long maxRamUsed, RandomSource strongPRNG, Random 
weakPRNG, boolean reallyEncrypt) {
+       public TempBucketFactory(Executor executor, FilenameGenerator 
filenameGenerator, long maxBucketSizeKeptInRam, long maxRamUsed, RandomSource 
strongPRNG, Random weakPRNG, boolean reallyEncrypt) {
                this.filenameGenerator = filenameGenerator;
                this.maxRamUsed = maxRamUsed;
                this.maxRAMBucketSize = maxBucketSizeKeptInRam;
                this.strongPRNG = strongPRNG;
                this.weakPRNG = weakPRNG;
                this.reallyEncrypt = reallyEncrypt;
+               this.executor = executor;
+               this.logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }

        public Bucket makeBucket(long size) throws IOException {
@@ -129,22 +291,27 @@
        }

        public synchronized void setMaxRamUsed(long size) {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
                maxRamUsed = size;
        }

        public synchronized long getMaxRamUsed() {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
                return maxRamUsed;
        }

        public synchronized void setMaxRAMBucketSize(long size) {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
                maxRAMBucketSize = size;
        }

        public synchronized long getMaxRAMBucketSize() {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
                return maxRAMBucketSize;
        }

        public void setEncryption(boolean value) {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
                reallyEncrypt = value;
        }

@@ -167,7 +334,10 @@
        public TempBucket makeBucket(long size, float factor, long increment) 
throws IOException {
                Bucket realBucket = null;
                boolean useRAMBucket = false;
+               long now = System.currentTimeMillis();

+               // We need to clean the queue in order to have "space" to host 
new buckets
+               cleanBucketQueue(now);
                synchronized(this) {
                        if((size > 0) && (size <= maxRAMBucketSize) && 
(bytesInUse <= maxRamUsed)) {
                                useRAMBucket = true;
@@ -175,10 +345,59 @@
                }

                // Do we want a RAMBucket or a FileBucket?
-               realBucket = (useRAMBucket ? new RAMBucket(size) : new 
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator));
-               // Do we want it to be encrypted?
-               realBucket = (!reallyEncrypt ? realBucket : new 
PaddedEphemerallyEncryptedBucket(realBucket, 1024, strongPRNG, weakPRNG));
+               realBucket = (useRAMBucket ? new ArrayBucket() : 
_makeFileBucket());

-               return new TempBucket(realBucket);
+               TempBucket toReturn = new TempBucket(now, realBucket);
+               if(useRAMBucket) { // No need to consider them for migration if 
they can't be migrated
+                       synchronized(ramBucketQueue) {
+                               ramBucketQueue.add(toReturn);
+                       }
+               }
+               return toReturn;
+}
+       
+       /** Migrate all long-lived buckets from the queue */
+       private void cleanBucketQueue(long now) {
+               boolean shouldContinue = true;
+               // create a new list to avoid race-conditions
+               final Queue<TempBucket> toMigrate = new 
LinkedList<TempBucket>();
+               do {
+                       synchronized(ramBucketQueue) {
+                               final TempBucket tmpBucket = 
ramBucketQueue.peek();
+                               if((tmpBucket == null) || 
(tmpBucket.creationTime + RAMBUCKET_MAX_AGE > now))
+                                       shouldContinue = false;
+                               else {
+                                       if(logMINOR)
+                                               Logger.minor(this, "The bucket 
is "+TimeUtil.formatTime(now - tmpBucket.creationTime)+" old: we will 
force-migrate it to disk.");
+                                       ramBucketQueue.remove(tmpBucket);
+                                       toMigrate.add(tmpBucket);
+                               }
+                       }
+               } while(shouldContinue);
+
+               if(toMigrate.size() > 0) {
+                       executor.execute(new Runnable() {
+
+                               public void run() {
+                                       if(logMINOR)
+                                               Logger.minor(this, "We are 
going to migrate " + toMigrate.size() + " RAMBuckets");
+                                       for(TempBucket tmpBucket : toMigrate) {
+                                               try {
+                                                       
tmpBucket.migrateToFileBucket();
+                                               } catch(IOException e) {
+                                                       Logger.error(tmpBucket, 
"An IOE occured while migrating long-lived buckets:" + e.getMessage(), e);
+                                               }
+                                       }
+                               }
+                       }, "RAMBucket migrator ("+now+')');
+               }
        }
+       
+       private final Queue<TempBucket> ramBucketQueue = new 
LinkedBlockingDeque<TempBucket>();
+       
+       private Bucket _makeFileBucket() {
+               Bucket fileBucket = new 
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator);
+               // Do we want it to be encrypted?
+               return (reallyEncrypt ? new 
PaddedEphemerallyEncryptedBucket(fileBucket, 1024, strongPRNG, weakPRNG) : 
fileBucket);
+       }
 }

Modified: trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
===================================================================
--- trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java       
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java       
2008-08-25 13:46:46 UTC (rev 22133)
@@ -64,6 +64,6 @@
                URI baseURI = new URI(BASE_URI);
                byte[] dataToFilter = data.getBytes("UTF-8");

-               return ContentFilter.filter(new ArrayBucket(dataToFilter, -1), 
bf, typeName, baseURI, null).data.toString();
+               return ContentFilter.filter(new ArrayBucket(dataToFilter), bf, 
typeName, baseURI, null).data.toString();
        }
 }

Modified: trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
===================================================================
--- trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java 
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java 
2008-08-25 13:46:46 UTC (rev 22133)
@@ -111,7 +111,7 @@
        public void testCompressException() {

                byte[] uncompressedData = UNCOMPRESSED_DATA_1.getBytes();
-               Bucket inBucket = new ArrayBucket(uncompressedData, 
uncompressedData.length);
+               Bucket inBucket = new ArrayBucket(uncompressedData);
                BucketFactory factory = new ArrayBucketFactory();

                try {
@@ -133,7 +133,7 @@

                byte[] compressedData = doCompress(uncompressedData);

-               Bucket inBucket = new ArrayBucket(compressedData, 
uncompressedData.length);
+               Bucket inBucket = new ArrayBucket(compressedData);
                BucketFactory factory = new ArrayBucketFactory();

                try {
@@ -147,7 +147,7 @@

        private byte[] doBucketDecompress(byte[] compressedData) {

-               Bucket inBucket = new ArrayBucket(compressedData, 
compressedData.length);
+               Bucket inBucket = new ArrayBucket(compressedData);
                BucketFactory factory = new ArrayBucketFactory();
                Bucket outBucket = null;

@@ -179,7 +179,7 @@
        }

        private byte[] doCompress(byte[] uncompressedData) {
-               Bucket inBucket = new ArrayBucket(uncompressedData, 
uncompressedData.length);
+               Bucket inBucket = new ArrayBucket(uncompressedData);
                BucketFactory factory = new ArrayBucketFactory();
                Bucket outBucket = null;



Reply via email to