Author: nextgens
Date: 2008-08-26 15:14:18 +0000 (Tue, 26 Aug 2008)
New Revision: 22178
Modified:
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucketFactory: keep track of all our open InputStreams so that we can close
them before we free()... and we spare a few cpu cycles on each access as we can
force-reset them when we migrate buckets
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-26
14:24:45 UTC (rev 22177)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-26
15:14:18 UTC (rev 22178)
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
+import java.util.Vector;
import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -67,6 +68,8 @@
private long currentSize;
/** A link to the "real" underlying outputStream, even if we
migrated */
private OutputStream os = null;
+ /** All the open-streams to reset or close on migration or
free() */
+ private final Vector<TempBucketInputStream> tbis;
/** An identifier used to know when to deprecate the
InputStreams */
private short osIndex;
/** A timestamp used to evaluate the age of the bucket and
maybe consider it for a migration */
@@ -78,8 +81,22 @@
this.currentBucket = cur;
this.creationTime = now;
this.osIndex = 0;
+ this.tbis = new Vector<TempBucketInputStream>();
}
+ private void closeInputStreams(boolean forFree) {
+ for(TempBucketInputStream is : tbis) {
+ try {
+ if(forFree)
+ is.close();
+ else
+ is._maybeResetInputStream();
+ } catch(IOException e) {
+ tbis.remove(is);
+ }
+ }
+ }
+
/** A blocking method to force-migrate from a RAMBucket to a
FileBucket */
private final void migrateToFileBucket() throws IOException {
Bucket toMigrate = null;
@@ -100,6 +117,9 @@
BucketTools.copyTo(toMigrate, os,
currentSize);
if(toMigrate.isReadOnly())
tempFB.setReadOnly();
+
+ closeInputStreams(false);
+
currentBucket = tempFB;
// We need streams to be reset to point to the
new bucket
}
@@ -199,7 +219,10 @@
public synchronized InputStream getInputStream() throws
IOException {
if(os == null)
throw new IOException("No OutputStream has been
openned! Why would you want an InputStream then?");
- return new TempBucketInputStream(osIndex);
+ TempBucketInputStream is = new
TempBucketInputStream(osIndex);
+ tbis.add(is);
+
+ return is;
}
private class TempBucketInputStream extends InputStream {
@@ -218,7 +241,7 @@
this.currentOS = os;
}
- private void _maybeResetInputStream() throws
IOException {
+ public void _maybeResetInputStream() throws IOException
{
if(idx != osIndex)
throw new IOException("Should use the
new InputStream!");
@@ -233,7 +256,6 @@
@Override
public final int read() throws IOException {
synchronized(TempBucket.this) {
- _maybeResetInputStream();
int toReturn = currentIS.read();
if(toReturn != -1)
index++;
@@ -251,7 +273,6 @@
@Override
public int read(byte b[], int off, int len) throws
IOException {
synchronized(TempBucket.this) {
- _maybeResetInputStream();
int toReturn = currentIS.read(b, off,
len);
if(toReturn > 0)
index += toReturn;
@@ -262,7 +283,6 @@
@Override
public long skip(long n) throws IOException {
synchronized(TempBucket.this) {
- _maybeResetInputStream();
long skipped = currentIS.skip(n);
index += skipped;
return skipped;
@@ -277,8 +297,8 @@
@Override
public final void close() throws IOException {
synchronized(TempBucket.this) {
- _maybeResetInputStream();
Closer.close(currentIS);
+ tbis.remove(this);
}
}
}
@@ -300,6 +320,8 @@
}
public synchronized void free() {
+ closeInputStreams(true);
+ Closer.close(os);
currentBucket.free();
if(isRAMBucket())
_hasFreed(currentSize);