Author: nextgens
Date: 2008-08-25 13:46:46 +0000 (Mon, 25 Aug 2008)
New Revision: 22133
Modified:
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/support/io/ArrayBucket.java
trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
Log:
TempBucketFactory: it is *very* unlikely that it won't break things! If it
doesn't it should speed things up significantly and solve the OOM problems a
few users have been reporting.
Why isn't PaddedEphemerallyEncryptedBucket not properly synchronized?
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2008-08-25 01:29:48 UTC
(rev 22132)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2008-08-25 13:46:46 UTC
(rev 22133)
@@ -154,6 +154,8 @@
// FIXME
throw new
InvalidConfigValueException(l10n("movingTempDirOnTheFlyNotSupported"));
}
+
+ @Override
public boolean isReadOnly() {
return true;
}
@@ -199,6 +201,8 @@
// FIXME
throw new
InvalidConfigValueException("Moving persistent temp directory on the fly not
supported at present");
}
+
+ @Override
public boolean isReadOnly() {
return true;
}
@@ -210,7 +214,7 @@
throw new
NodeInitException(NodeInitException.EXIT_BAD_TEMP_DIR, msg);
}
- nodeConfig.register("maxRAMBucketSize", "32KiB", sortOrder++,
true, false, "NodeClientCore.maxRAMBucketSize",
"NodeClientCore.maxRAMBucketSizeLong", new LongCallback() {
+ nodeConfig.register("maxRAMBucketSize", "128KiB", sortOrder++,
true, false, "NodeClientCore.maxRAMBucketSize",
"NodeClientCore.maxRAMBucketSizeLong", new LongCallback() {
public Long get() {
return (tempBucketFactory == null ? 0 :
tempBucketFactory.getMaxRAMBucketSize());
@@ -247,7 +251,7 @@
tempBucketFactory.setEncryption(val);
}
});
- tempBucketFactory = new
TempBucketFactory(tempFilenameGenerator,
nodeConfig.getLong("maxRAMBucketSize"),
nodeConfig.getLong("RAMBucketPoolSize"), random, node.fastWeakRandom,
nodeConfig.getBoolean("encryptTempBuckets"));
+ tempBucketFactory = new TempBucketFactory(node.executor,
tempFilenameGenerator, nodeConfig.getLong("maxRAMBucketSize"),
nodeConfig.getLong("RAMBucketPoolSize"), random, node.fastWeakRandom,
nodeConfig.getBoolean("encryptTempBuckets"));
// Downloads directory
Modified: trunk/freenet/src/freenet/support/io/ArrayBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/ArrayBucket.java 2008-08-25
01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/ArrayBucket.java 2008-08-25
13:46:46 UTC (rev 22133)
@@ -1,6 +1,5 @@
package freenet.support.io;
-import freenet.support.Logger;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -20,32 +19,33 @@
private final String name;
private volatile boolean readOnly;
- /** The maximum size of the bucket; -1 means no maxSize */
- private final long maxSize;
private long size;
- private static boolean logDEBUG = false;
-
- public ArrayBucket(long maxSize) {
- this("ArrayBucket", maxSize);
+ /** Create a new immutable ArrayBucket with the data provided and a
name*/
+ public ArrayBucket(String name, byte[] initdata) {
+ this(name);
+ data.add(initdata);
+ this.size = initdata.length;
+ setReadOnly();
}
- public ArrayBucket(byte[] finalData) {
- this(finalData, finalData.length);
+ /** Create a new immutable ArrayBucket with the data provided */
+ public ArrayBucket(byte[] initdata) {
+ this();
+ data.add(initdata);
+ this.size = initdata.length;
setReadOnly();
}
- public ArrayBucket(byte[] initdata, long maxSize) {
- this("ArrayBucket", maxSize);
- data.add(initdata);
+ /** Create a new array bucket */
+ public ArrayBucket() {
+ this("ArrayBucket");
}
- ArrayBucket(String name, long maxSize) {
+ /** Create a new array bucket with the provided name */
+ public ArrayBucket(String name) {
data = new ArrayList<byte[]>();
this.name = name;
- this.maxSize = maxSize;
- if(logDEBUG && maxSize < 0)
- Logger.minor(this, "Has been called with maxSize<0 !",
new NullPointerException());
}
public synchronized OutputStream getOutputStream() throws IOException {
@@ -86,7 +86,7 @@
}
private class ArrayBucketOutputStream extends ByteArrayOutputStream {
- boolean hasBeenClosed = false;
+ private boolean hasBeenClosed = false;
public ArrayBucketOutputStream() {
super();
@@ -95,23 +95,17 @@
@Override
public synchronized void write(byte b[], int off, int len) {
if(readOnly) throw new IllegalStateException("Read
only");
- long sizeIfWritten = size + len;
- if(logDEBUG && maxSize > -1 && maxSize < sizeIfWritten)
// FIXME: should be IOE but how to do it?
- throw new IllegalArgumentException("The maxSize
of the bucket is "+maxSize+
- " and writing "+len+ " bytes to it
would make it oversize!");
+ if(hasBeenClosed) throw new IllegalStateException("Has
been closed!");
super.write(b, off, len);
- size = sizeIfWritten;
+ size +=len;
}
@Override
public synchronized void write(int b) {
if(readOnly) throw new IllegalStateException("Read
only");
- long sizeIfWritten = size + 1;
- if(logDEBUG && maxSize > -1 && maxSize < sizeIfWritten)
// FIXME: should be IOE but how to do it?
- throw new IllegalArgumentException("The maxSize
of the bucket is "+maxSize+
- " and writing 1 byte to it would make
it oversize!");
+ if(hasBeenClosed) throw new IllegalStateException("Has
been closed!");
super.write(b);
- size = sizeIfWritten;
+ size++;
}
@Override
@@ -125,21 +119,23 @@
private class ArrayBucketInputStream extends InputStream {
- private Iterator i;
+ private final Iterator<byte[]> i;
private ByteArrayInputStream in;
+ private boolean hasBeenClosed = false;
public ArrayBucketInputStream() {
i = data.iterator();
}
- public synchronized int read() {
+ public synchronized int read() throws IOException {
return priv_read();
}
- private synchronized int priv_read() {
+ private synchronized int priv_read() throws IOException {
+ if(hasBeenClosed) throw new IOException("Has been
closed!");
if (in == null) {
if (i.hasNext()) {
- in = new ByteArrayInputStream((byte[])
i.next());
+ in = new ByteArrayInputStream(i.next());
} else {
return -1;
}
@@ -154,19 +150,20 @@
}
@Override
- public synchronized int read(byte[] b) {
+ public synchronized int read(byte[] b) throws IOException {
return priv_read(b, 0, b.length);
}
@Override
- public synchronized int read(byte[] b, int off, int len) {
+ public synchronized int read(byte[] b, int off, int len) throws
IOException {
return priv_read(b, off, len);
}
- private synchronized int priv_read(byte[] b, int off, int len) {
+ private synchronized int priv_read(byte[] b, int off, int len)
throws IOException {
+ if(hasBeenClosed) throw new IOException("Has been
closed!");
if (in == null) {
if (i.hasNext()) {
- in = new ByteArrayInputStream((byte[])
i.next());
+ in = new ByteArrayInputStream(i.next());
} else {
return -1;
}
@@ -181,10 +178,11 @@
}
@Override
- public synchronized int available() {
+ public synchronized int available() throws IOException {
+ if(hasBeenClosed) throw new IOException("Has been
closed!");
if (in == null) {
if (i.hasNext()) {
- in = new ByteArrayInputStream((byte[])
i.next());
+ in = new ByteArrayInputStream(i.next());
} else {
return 0;
}
@@ -192,6 +190,12 @@
return in.available();
}
+ @Override
+ public synchronized void close() throws IOException {
+ if(hasBeenClosed) return;
+ hasBeenClosed = true;
+ Closer.close(in);
+ }
}
public boolean isReadOnly() {
@@ -214,8 +218,7 @@
int bufSize = (int)sz;
byte[] buf = new byte[bufSize];
int index = 0;
- for(Iterator i=data.iterator();i.hasNext();) {
- byte[] obuf = (byte[]) i.next();
+ for(byte[] obuf : data) {
System.arraycopy(obuf, 0, buf, index, obuf.length);
index += obuf.length;
}
Modified: trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/ArrayBucketFactory.java
2008-08-25 13:46:46 UTC (rev 22133)
@@ -11,7 +11,7 @@
public class ArrayBucketFactory implements BucketFactory {
public Bucket makeBucket(long size) throws IOException {
- return new ArrayBucket(size);
+ return new ArrayBucket();
}
public void freeBucket(Bucket b) throws IOException {
Modified:
trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/PaddedEphemerallyEncryptedBucket.java
2008-08-25 13:46:46 UTC (rev 22133)
@@ -151,6 +151,7 @@
}
}
+ @Override
public void write(byte[] buf, int offset, int length) throws
IOException {
if(closed) throw new IOException("Already closed!");
if(streamNumber != lastOutputStream)
@@ -165,14 +166,7 @@
}
}
- // Override this or FOS will use write(int)
- public void write(byte[] buf) throws IOException {
- if(closed) throw new IOException("Already closed!");
- if(streamNumber != lastOutputStream)
- throw new IllegalStateException("Writing to old
stream in "+getName());
- write(buf, 0, buf.length);
- }
-
+ @Override
public void close() throws IOException {
if(closed) return;
try {
@@ -225,11 +219,13 @@
return pcfb.decipher(x);
}
+ @Override
public final int available() {
int x = (int)Math.min(dataLength - ptr,
Integer.MAX_VALUE);
return (x < 0) ? 0 : x;
}
+ @Override
public int read(byte[] buf, int offset, int length) throws
IOException {
// FIXME remove debugging
if((length+offset > buf.length) || (offset < 0) ||
(length < 0))
@@ -244,10 +240,12 @@
return readBytes;
}
+ @Override
public int read(byte[] buf) throws IOException {
return read(buf, 0, buf.length);
}
+ @Override
public long skip(long bytes) throws IOException {
byte[] buf = new byte[(int)Math.min(4096, bytes)];
long skipped = 0;
@@ -259,6 +257,7 @@
return skipped;
}
+ @Override
public void close() throws IOException {
in.close();
}
@@ -308,6 +307,7 @@
return "Encrypted:"+bucket.getName();
}
+ @Override
public String toString() {
return super.toString()+ ':' +bucket.toString();
}
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
01:29:48 UTC (rev 22132)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
13:46:46 UTC (rev 22133)
@@ -4,6 +4,10 @@
package freenet.support.io;
import freenet.crypt.RandomSource;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.SizeUtil;
+import freenet.support.TimeUtil;
import java.io.IOException;
import freenet.support.api.Bucket;
@@ -11,101 +15,259 @@
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
/**
* Temporary Bucket Factory
+ *
+ * Buckets created by this factory can be either:
+ * - ArrayBuckets
+ * OR
+ * - FileBuckets
+ *
+ * ArrayBuckets are used if and only if:
+ * 1) there is enough room remaining on the pool (@see maxRamUsed and @see
bytesInUse)
+ * 2) the initial size is smaller than (@maxRAMBucketSize)
+ *
+ * Depending on how they are used they might switch from one type to another
transparently.
+ *
+ * Currently they are two factors considered for a migration:
+ * - if they are long-lived or not (@see RAMBUCKET_MAX_AGE)
+ * - if their size is over RAMBUCKET_CONVERSION_FACTOR*maxRAMBucketSize
*/
public class TempBucketFactory implements BucketFactory {
+ public final static long defaultIncrement = 4096;
+ public final static float DEFAULT_FACTOR = 1.25F;
+
+ private final FilenameGenerator filenameGenerator;
+ private long bytesInUse = 0;
+ private final RandomSource strongPRNG;
+ private final Random weakPRNG;
+ private final Executor executor;
+ private volatile boolean logMINOR;
+ private volatile boolean reallyEncrypt;
+
+ /** How big can the defaultSize be for us to consider using RAMBuckets?
*/
+ private long maxRAMBucketSize;
+ /** How much memory do we dedicate to the RAMBucketPool? (in bytes) */
+ private long maxRamUsed;
+
+ /** How old is a long-lived RAMBucket? */
+ private final int RAMBUCKET_MAX_AGE = 5*60*1000; // 5mins
+ /** How many times the maxRAMBucketSize can a RAMBucket be before it
gets migrated? */
+ private final int RAMBUCKET_CONVERSION_FACTOR = 4;
+
public class TempBucket implements Bucket {
private Bucket currentBucket;
+ private long currentSize;
+ private volatile boolean shouldResetOS = false;
+ private volatile boolean shouldResetIS = false;
+ public final long creationTime;
- public TempBucket(Bucket cur) {
+ public TempBucket(long now, Bucket cur) {
+ if(cur == null)
+ throw new NullPointerException();
this.currentBucket = cur;
+ this.creationTime = now;
}
- public final void migrateToFileBucket() throws IOException {
- RAMBucket ramBucket = null;
- synchronized(this) {
+ /** A blocking method to force-migrate from a RAMBucket to a
FileBucket */
+ private final void migrateToFileBucket() throws IOException {
+ Bucket toMigrate = null;
+ synchronized(currentBucket) {
if(!isRAMBucket())
+ // Nothing to migrate! We don't want to
switch back to ram, do we?
return;
- ramBucket = (RAMBucket) currentBucket;
- TempFileBucket tempFB = new
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator);
+ toMigrate = currentBucket;
+ Bucket tempFB = _makeFileBucket();
BucketTools.copy(currentBucket, tempFB);
currentBucket = tempFB;
+ // We need streams to be reset to point to the
new bucket
+ shouldResetOS = true;
+ shouldResetIS = true;
}
- ramBucket.free();
+ if(logMINOR)
+ Logger.minor(this, "We have migrated
"+toMigrate.hashCode());
+
+ // Might have changed already so we can't rely on
currentSize!
+ _hasFreed(toMigrate.size());
+ // We can free it on-thread as it's a rambucket
+ toMigrate.free();
}
- public final synchronized boolean isRAMBucket() {
- return (currentBucket instanceof RAMBucket);
+ public final boolean isRAMBucket() {
+ synchronized(currentBucket) {
+ return (currentBucket instanceof ArrayBucket);
+ }
}
- public synchronized OutputStream getOutputStream() throws
IOException {
- return currentBucket.getOutputStream();
+ public OutputStream getOutputStream() throws IOException {
+ synchronized(currentBucket) {
+ shouldResetOS = true;
+ return new TempBucketOutputStream();
+ }
}
- public synchronized InputStream getInputStream() throws
IOException {
- return currentBucket.getInputStream();
+ private class TempBucketOutputStream extends OutputStream {
+ private OutputStream os;
+
+ private void _maybeMigrateRamBucket(long futureSize)
throws IOException {
+ if(isRAMBucket()) {
+ boolean shouldMigrate = false;
+ boolean isOversized = false;
+
+ if(futureSize > maxRAMBucketSize *
RAMBUCKET_CONVERSION_FACTOR) {
+ isOversized = true;
+ shouldMigrate = true;
+ } else if (futureSize + currentSize >
maxRamUsed)
+ shouldMigrate = true;
+
+ if(shouldMigrate) {
+ os.close();
+ if(logMINOR) {
+ if(isOversized)
+
Logger.minor(this, "The bucket is over
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will
force-migrate it to disk.");
+ else
+
Logger.minor(this, "The bucketpool is full: force-migrate before we go over the
limit");
+ }
+ migrateToFileBucket();
+ }
+ }
+ }
+
+ private void _maybeResetOutputStream() throws
IOException {
+ if(shouldResetOS) {
+ Closer.close(os);
+ os = currentBucket.getOutputStream();
+ shouldResetOS = false;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ synchronized(currentBucket) {
+ long futurSize = currentSize + 1;
+ _maybeMigrateRamBucket(futurSize);
+ _maybeResetOutputStream();
+ os.write(b);
+ currentSize = futurSize;
+ if(isRAMBucket()) // We need to
re-check because it might have changed!
+ _hasTaken(1);
+ }
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws
IOException {
+ synchronized(currentBucket) {
+ long futurSize = currentSize + len;
+ _maybeMigrateRamBucket(futurSize);
+ _maybeResetOutputStream();
+ os.write(b, off, len);
+ currentSize = futurSize;
+ if(isRAMBucket()) // We need to
re-check because it might have changed!
+ _hasTaken(len);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ synchronized(currentBucket) {
+ _maybeMigrateRamBucket(currentSize);
+ _maybeResetOutputStream();
+ os.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized(currentBucket) {
+ _maybeMigrateRamBucket(currentSize);
+ _maybeResetOutputStream();
+ os.close();
+ }
+ }
}
- public synchronized String getName() {
- return currentBucket.getName();
+ public synchronized InputStream getInputStream() throws
IOException {
+ shouldResetIS = true;
+ return new TempBucketInputStream();
}
+
+ private class TempBucketInputStream extends InputStream {
+ private InputStream is;
+
+ private void _maybeResetInputStream() throws
IOException {
+ if(shouldResetIS) {
+ Closer.close(is);
+ is = currentBucket.getInputStream();
+ shouldResetIS = false;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ synchronized(currentBucket) {
+ _maybeResetInputStream();
+ return is.read();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized(currentBucket) {
+ _maybeResetInputStream();
+ is.close();
+ }
+ }
+ }
- public synchronized long size() {
- return currentBucket.size();
+ public String getName() {
+ synchronized(currentBucket) {
+ return currentBucket.getName();
+ }
}
- public synchronized boolean isReadOnly() {
- return currentBucket.isReadOnly();
+ public long size() {
+ synchronized(currentBucket) {
+ return currentBucket.size();
+ }
}
- public synchronized void setReadOnly() {
- currentBucket.setReadOnly();
+ public boolean isReadOnly() {
+ synchronized(currentBucket) {
+ return currentBucket.isReadOnly();
+ }
}
- public synchronized void free() {
- currentBucket.free();
+ public void setReadOnly() {
+ synchronized(currentBucket) {
+ currentBucket.setReadOnly();
+ }
}
- }
- private class RAMBucket extends ArrayBucket {
- public RAMBucket(long size) {
- super("RAMBucket", size);
- _hasTaken(size);
- }
-
- @Override
public void free() {
- super.free();
- _hasFreed(size());
+ synchronized(currentBucket) {
+ if(isRAMBucket())
+ _hasFreed(currentSize);
+ currentBucket.free();
+ }
}
}
- private final FilenameGenerator filenameGenerator;
- private long bytesInUse = 0;
-
- public final static long defaultIncrement = 4096;
-
- public final static float DEFAULT_FACTOR = 1.25F;
-
- public long maxRAMBucketSize;
- public long maxRamUsed;
-
- private final RandomSource strongPRNG;
- private final Random weakPRNG;
- private volatile boolean reallyEncrypt;
-
// Storage accounting disabled by default.
- public TempBucketFactory(FilenameGenerator filenameGenerator, long
maxBucketSizeKeptInRam, long maxRamUsed, RandomSource strongPRNG, Random
weakPRNG, boolean reallyEncrypt) {
+ public TempBucketFactory(Executor executor, FilenameGenerator
filenameGenerator, long maxBucketSizeKeptInRam, long maxRamUsed, RandomSource
strongPRNG, Random weakPRNG, boolean reallyEncrypt) {
this.filenameGenerator = filenameGenerator;
this.maxRamUsed = maxRamUsed;
this.maxRAMBucketSize = maxBucketSizeKeptInRam;
this.strongPRNG = strongPRNG;
this.weakPRNG = weakPRNG;
this.reallyEncrypt = reallyEncrypt;
+ this.executor = executor;
+ this.logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
public Bucket makeBucket(long size) throws IOException {
@@ -129,22 +291,27 @@
}
public synchronized void setMaxRamUsed(long size) {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
maxRamUsed = size;
}
public synchronized long getMaxRamUsed() {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
return maxRamUsed;
}
public synchronized void setMaxRAMBucketSize(long size) {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
maxRAMBucketSize = size;
}
public synchronized long getMaxRAMBucketSize() {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
return maxRAMBucketSize;
}
public void setEncryption(boolean value) {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
reallyEncrypt = value;
}
@@ -167,7 +334,10 @@
public TempBucket makeBucket(long size, float factor, long increment)
throws IOException {
Bucket realBucket = null;
boolean useRAMBucket = false;
+ long now = System.currentTimeMillis();
+ // We need to clean the queue in order to have "space" to host
new buckets
+ cleanBucketQueue(now);
synchronized(this) {
if((size > 0) && (size <= maxRAMBucketSize) &&
(bytesInUse <= maxRamUsed)) {
useRAMBucket = true;
@@ -175,10 +345,59 @@
}
// Do we want a RAMBucket or a FileBucket?
- realBucket = (useRAMBucket ? new RAMBucket(size) : new
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator));
- // Do we want it to be encrypted?
- realBucket = (!reallyEncrypt ? realBucket : new
PaddedEphemerallyEncryptedBucket(realBucket, 1024, strongPRNG, weakPRNG));
+ realBucket = (useRAMBucket ? new ArrayBucket() :
_makeFileBucket());
- return new TempBucket(realBucket);
+ TempBucket toReturn = new TempBucket(now, realBucket);
+ if(useRAMBucket) { // No need to consider them for migration if
they can't be migrated
+ synchronized(ramBucketQueue) {
+ ramBucketQueue.add(toReturn);
+ }
+ }
+ return toReturn;
+}
+
+ /** Migrate all long-lived buckets from the queue */
+ private void cleanBucketQueue(long now) {
+ boolean shouldContinue = true;
+ // create a new list to avoid race-conditions
+ final Queue<TempBucket> toMigrate = new
LinkedList<TempBucket>();
+ do {
+ synchronized(ramBucketQueue) {
+ final TempBucket tmpBucket =
ramBucketQueue.peek();
+ if((tmpBucket == null) ||
(tmpBucket.creationTime + RAMBUCKET_MAX_AGE > now))
+ shouldContinue = false;
+ else {
+ if(logMINOR)
+ Logger.minor(this, "The bucket
is "+TimeUtil.formatTime(now - tmpBucket.creationTime)+" old: we will
force-migrate it to disk.");
+ ramBucketQueue.remove(tmpBucket);
+ toMigrate.add(tmpBucket);
+ }
+ }
+ } while(shouldContinue);
+
+ if(toMigrate.size() > 0) {
+ executor.execute(new Runnable() {
+
+ public void run() {
+ if(logMINOR)
+ Logger.minor(this, "We are
going to migrate " + toMigrate.size() + " RAMBuckets");
+ for(TempBucket tmpBucket : toMigrate) {
+ try {
+
tmpBucket.migrateToFileBucket();
+ } catch(IOException e) {
+ Logger.error(tmpBucket,
"An IOE occured while migrating long-lived buckets:" + e.getMessage(), e);
+ }
+ }
+ }
+ }, "RAMBucket migrator ("+now+')');
+ }
}
+
+ private final Queue<TempBucket> ramBucketQueue = new
LinkedBlockingDeque<TempBucket>();
+
+ private Bucket _makeFileBucket() {
+ Bucket fileBucket = new
TempFileBucket(filenameGenerator.makeRandomFilename(), filenameGenerator);
+ // Do we want it to be encrypted?
+ return (reallyEncrypt ? new
PaddedEphemerallyEncryptedBucket(fileBucket, 1024, strongPRNG, weakPRNG) :
fileBucket);
+ }
}
Modified: trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
===================================================================
--- trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/test/freenet/clients/http/filter/ContentFilterTest.java
2008-08-25 13:46:46 UTC (rev 22133)
@@ -64,6 +64,6 @@
URI baseURI = new URI(BASE_URI);
byte[] dataToFilter = data.getBytes("UTF-8");
- return ContentFilter.filter(new ArrayBucket(dataToFilter, -1),
bf, typeName, baseURI, null).data.toString();
+ return ContentFilter.filter(new ArrayBucket(dataToFilter), bf,
typeName, baseURI, null).data.toString();
}
}
Modified: trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
===================================================================
--- trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
2008-08-25 01:29:48 UTC (rev 22132)
+++ trunk/freenet/test/freenet/support/compress/GzipCompressorTest.java
2008-08-25 13:46:46 UTC (rev 22133)
@@ -111,7 +111,7 @@
public void testCompressException() {
byte[] uncompressedData = UNCOMPRESSED_DATA_1.getBytes();
- Bucket inBucket = new ArrayBucket(uncompressedData,
uncompressedData.length);
+ Bucket inBucket = new ArrayBucket(uncompressedData);
BucketFactory factory = new ArrayBucketFactory();
try {
@@ -133,7 +133,7 @@
byte[] compressedData = doCompress(uncompressedData);
- Bucket inBucket = new ArrayBucket(compressedData,
uncompressedData.length);
+ Bucket inBucket = new ArrayBucket(compressedData);
BucketFactory factory = new ArrayBucketFactory();
try {
@@ -147,7 +147,7 @@
private byte[] doBucketDecompress(byte[] compressedData) {
- Bucket inBucket = new ArrayBucket(compressedData,
compressedData.length);
+ Bucket inBucket = new ArrayBucket(compressedData);
BucketFactory factory = new ArrayBucketFactory();
Bucket outBucket = null;
@@ -179,7 +179,7 @@
}
private byte[] doCompress(byte[] uncompressedData) {
- Bucket inBucket = new ArrayBucket(uncompressedData,
uncompressedData.length);
+ Bucket inBucket = new ArrayBucket(uncompressedData);
BucketFactory factory = new ArrayBucketFactory();
Bucket outBucket = null;