Author: nextgens
Date: 2008-08-26 07:54:24 +0000 (Tue, 26 Aug 2008)
New Revision: 22167
Modified:
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucket: simplify the logic even more, add comments
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
23:12:06 UTC (rev 22166)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-26
07:54:24 UTC (rev 22167)
@@ -61,13 +61,16 @@
private final int RAMBUCKET_CONVERSION_FACTOR = 4;
public class TempBucket implements Bucket {
+ /** The underlying bucket itself */
private Bucket currentBucket;
+ /** We have to account the size of the underlying bucket
ourself in order to be able to access it fast */
private long currentSize;
+ /** A link to the "real" underlying outputStream, even if we
migrated */
private OutputStream os = null;
+ /** An identifier used to know when to deprecate the
InputStreams */
private short osIndex;
- private short shouldResetIS = 0;
- private short isIndex;
public final Object sync = new Object();
+ /** A timestamp used to evaluate the age of the bucket and
maybe consider it for a migration */
public final long creationTime;
public TempBucket(long now, Bucket cur) {
@@ -76,7 +79,6 @@
this.currentBucket = cur;
this.creationTime = now;
this.osIndex = 0;
- this.isIndex = 0;
}
/** A blocking method to force-migrate from a RAMBucket to a
FileBucket */
@@ -93,13 +95,14 @@
os.flush();
os.close();
}
+ // Update the global link so that all streams
will be reset
+ // DO NOT INCREMENT THE osIndex HERE!
os = tempFB.getOutputStream();
BucketTools.copyTo(toMigrate, os, currentSize);
if(toMigrate.isReadOnly())
tempFB.setReadOnly();
currentBucket = tempFB;
// We need streams to be reset to point to the
new bucket
- shouldResetIS = isIndex;
}
if(logMINOR)
Logger.minor(this, "We have migrated
"+toMigrate.hashCode());
@@ -191,38 +194,42 @@
public final void close() throws IOException {
synchronized(sync) {
_maybeMigrateRamBucket(currentSize);
- if(os != null) {
- os.flush();
- os.close();
- }
+ os.flush();
+ os.close();
+ // DO NOT NULL os OUT HERE!
}
}
}
public synchronized InputStream getInputStream() throws
IOException {
- isIndex++;
return new TempBucketInputStream(osIndex);
}
private class TempBucketInputStream extends InputStream {
+ /** The current InputStream we use from the underlying
bucket */
private InputStream currentIS;
+ /** Keep a link to the current OutputStream to know
when to reset the stream */
+ private OutputStream currentOS;
+ /** Keep a counter to know where we are on the stream
(usefull when we have to reset and skip) */
private long index = 0;
+ /** Will change if a new OutputStream is openned: used
to detect deprecation */
private final short idx;
TempBucketInputStream(short idx) throws IOException {
this.idx = idx;
this.currentIS = currentBucket.getInputStream();
+ this.currentOS = os;
}
private void _maybeResetInputStream() throws
IOException {
if(idx != osIndex)
throw new IOException("Should use the
new InputStream!");
- if(shouldResetIS > 0) {
+ if(currentOS != os) {
Closer.close(currentIS);
currentIS =
currentBucket.getInputStream();
currentIS.skip(index);
- shouldResetIS--;
+ currentOS = os;
}
}
@@ -247,7 +254,7 @@
synchronized(sync) {
_maybeResetInputStream();
int toReturn = currentIS.read(b, off,
len);
- if(toReturn > -1)
+ if(toReturn > 0)
index += len;
return toReturn;
}
@@ -273,7 +280,6 @@
synchronized(sync) {
_maybeResetInputStream();
Closer.close(currentIS);
- isIndex--;
}
}
}