Author: nextgens
Date: 2008-08-25 23:06:22 +0000 (Mon, 25 Aug 2008)
New Revision: 22164
Modified:
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucket: simplify the code even more... but it's still not working as
expected!
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
22:45:37 UTC (rev 22163)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
23:06:22 UTC (rev 22164)
@@ -65,8 +65,8 @@
private long currentSize;
private OutputStream os = null;
private short osIndex;
- private volatile boolean shouldResetOS = false;
- private volatile boolean shouldResetIS = false;
+ private short shouldResetIS = 0;
+ private short isIndex;
public final Object sync = new Object();
public final long creationTime;
@@ -76,6 +76,7 @@
this.currentBucket = cur;
this.creationTime = now;
this.osIndex = 0;
+ this.isIndex = 0;
}
/** A blocking method to force-migrate from a RAMBucket to a
FileBucket */
@@ -88,14 +89,17 @@
toMigrate = currentBucket;
Bucket tempFB = _makeFileBucket();
+ if(os != null) {
+ os.flush();
+ os.close();
+ }
os = tempFB.getOutputStream();
BucketTools.copyTo(tempFB, os, currentSize);
if(toMigrate.isReadOnly())
tempFB.setReadOnly();
currentBucket = tempFB;
// We need streams to be reset to point to the
new bucket
- shouldResetOS = true;
- shouldResetIS = true;
+ shouldResetIS = isIndex;
}
if(logMINOR)
Logger.minor(this, "We have migrated
"+toMigrate.hashCode());
@@ -114,23 +118,21 @@
public OutputStream getOutputStream() throws IOException {
synchronized(sync) {
- shouldResetOS = true;
+ if(os != null)
+ throw new IOException("Only one
OutputStream per bucket!");
return new TempBucketOutputStream(++osIndex);
}
}
private class TempBucketOutputStream extends OutputStream {
- private OutputStream currentOS;
private final short idx;
- TempBucketOutputStream(short idx) {
+ TempBucketOutputStream(short idx) throws IOException {
this.idx = idx;
+ os = currentBucket.getOutputStream();
}
private void _maybeMigrateRamBucket(long futureSize)
throws IOException {
- if(idx != osIndex)
- throw new IOException("Should use the
new OutputStream!");
-
if(isRAMBucket()) {
boolean shouldMigrate = false;
boolean isOversized = false;
@@ -142,7 +144,6 @@
shouldMigrate = true;
if(shouldMigrate) {
- Closer.close(currentOS);
if(logMINOR) {
if(isOversized)
Logger.minor(this, "The bucket is over
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will
force-migrate it to disk.");
@@ -154,22 +155,13 @@
}
}
- private void _maybeResetOutputStream() throws
IOException {
- if(shouldResetOS) {
- Closer.close(currentOS);
- currentOS = (os == null ?
currentBucket.getOutputStream() : os);
- shouldResetOS = false;
- }
- }
-
@Override
public final void write(int b) throws IOException {
synchronized(sync) {
- long futurSize = currentSize + 1;
- _maybeMigrateRamBucket(futurSize);
- _maybeResetOutputStream();
- currentOS.write(b);
- currentSize = futurSize;
+ long futureSize = currentSize + 1;
+ _maybeMigrateRamBucket(futureSize);
+ os.write(b);
+ currentSize = futureSize;
if(isRAMBucket()) // We need to
re-check because it might have changed!
_hasTaken(1);
}
@@ -180,8 +172,7 @@
synchronized(sync) {
long futureSize = currentSize + len;
_maybeMigrateRamBucket(futureSize);
- _maybeResetOutputStream();
- currentOS.write(b, off, len);
+ os.write(b, off, len);
currentSize = futureSize;
if(isRAMBucket()) // We need to
re-check because it might have changed!
_hasTaken(len);
@@ -192,8 +183,7 @@
public final void flush() throws IOException {
synchronized(sync) {
_maybeMigrateRamBucket(currentSize);
- _maybeResetOutputStream();
- currentOS.flush();
+ os.flush();
}
}
@@ -201,17 +191,16 @@
public final void close() throws IOException {
synchronized(sync) {
_maybeMigrateRamBucket(currentSize);
- _maybeResetOutputStream();
- if(currentOS != null) {
- currentOS.flush();
- Closer.close(currentOS);
+ if(os != null) {
+ os.flush();
+ os.close();
}
}
}
}
public synchronized InputStream getInputStream() throws
IOException {
- shouldResetIS = true;
+ isIndex++;
return new TempBucketInputStream(osIndex);
}
@@ -220,19 +209,20 @@
private long index = 0;
private final short idx;
- TempBucketInputStream(short idx) {
+ TempBucketInputStream(short idx) throws IOException {
this.idx = idx;
+ this.currentIS = currentBucket.getInputStream();
}
private void _maybeResetInputStream() throws
IOException {
if(idx != osIndex)
throw new IOException("Should use the
new InputStream!");
- if(shouldResetIS) {
+ if(shouldResetIS > 0) {
Closer.close(currentIS);
currentIS =
currentBucket.getInputStream();
currentIS.skip(index);
- shouldResetIS = false;
+ shouldResetIS--;
}
}
@@ -283,6 +273,7 @@
synchronized(sync) {
_maybeResetInputStream();
Closer.close(currentIS);
+ isIndex--;
}
}
}