Author: nextgens
Date: 2008-08-25 21:27:01 +0000 (Mon, 25 Aug 2008)
New Revision: 22149
Modified:
trunk/freenet/src/freenet/support/io/BucketTools.java
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBuckets: maybe fix the problem with OutputStreams
Modified: trunk/freenet/src/freenet/support/io/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/io/BucketTools.java 2008-08-25
20:41:10 UTC (rev 22148)
+++ trunk/freenet/src/freenet/support/io/BucketTools.java 2008-08-25
21:27:01 UTC (rev 22149)
@@ -294,6 +294,7 @@
}
} finally {
is.close();
+ os.flush();
}
}
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
20:41:10 UTC (rev 22148)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
21:27:01 UTC (rev 22149)
@@ -63,6 +63,8 @@
public class TempBucket implements Bucket {
private Bucket currentBucket;
private long currentSize;
+ private OutputStream os = null;
+ private InputStream is = null;
private volatile boolean shouldResetOS = false;
private volatile boolean shouldResetIS = false;
public final long creationTime;
@@ -84,7 +86,9 @@
toMigrate = currentBucket;
Bucket tempFB = _makeFileBucket();
- BucketTools.copy(currentBucket, tempFB);
+ os = tempFB.getOutputStream();
+ is = tempFB.getInputStream();
+ BucketTools.copyTo(tempFB, os, currentSize);
if(toMigrate.isReadOnly())
tempFB.setReadOnly();
currentBucket = tempFB;
@@ -115,7 +119,7 @@
}
private class TempBucketOutputStream extends OutputStream {
- private OutputStream os;
+ private OutputStream currentOS;
private void _maybeMigrateRamBucket(long futureSize)
throws IOException {
if(isRAMBucket()) {
@@ -129,7 +133,7 @@
shouldMigrate = true;
if(shouldMigrate) {
- Closer.close(os);
+ Closer.close(currentOS);
if(logMINOR) {
if(isOversized)
Logger.minor(this, "The bucket is over
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will
force-migrate it to disk.");
@@ -143,8 +147,8 @@
private void _maybeResetOutputStream() throws
IOException {
if(shouldResetOS) {
- Closer.close(os);
- os = currentBucket.getOutputStream();
+ Closer.close(currentOS);
+ currentOS = (os == null ?
currentBucket.getOutputStream() : os);
shouldResetOS = false;
}
}
@@ -155,7 +159,7 @@
long futurSize = currentSize + 1;
_maybeMigrateRamBucket(futurSize);
_maybeResetOutputStream();
- os.write(b);
+ currentOS.write(b);
currentSize = futurSize;
if(isRAMBucket()) // We need to
re-check because it might have changed!
_hasTaken(1);
@@ -168,7 +172,7 @@
long futurSize = currentSize + len;
_maybeMigrateRamBucket(futurSize);
_maybeResetOutputStream();
- os.write(b, off, len);
+ currentOS.write(b, off, len);
currentSize = futurSize;
if(isRAMBucket()) // We need to
re-check because it might have changed!
_hasTaken(len);
@@ -180,7 +184,7 @@
synchronized(currentBucket) {
_maybeMigrateRamBucket(currentSize);
_maybeResetOutputStream();
- os.flush();
+ currentOS.flush();
}
}
@@ -189,7 +193,7 @@
synchronized(currentBucket) {
_maybeMigrateRamBucket(currentSize);
_maybeResetOutputStream();
- Closer.close(os);
+ Closer.close(currentOS);
}
}
}
@@ -200,14 +204,14 @@
}
private class TempBucketInputStream extends InputStream {
- private InputStream is;
+ private InputStream currentIS;
private long index = 0;
private void _maybeResetInputStream() throws
IOException {
if(shouldResetIS) {
- Closer.close(is);
- is = currentBucket.getInputStream();
- is.skip(index);
+ Closer.close(currentIS);
+ currentIS = (is == null ?
currentBucket.getInputStream() : is);
+ currentIS.skip(index);
shouldResetIS = false;
}
}
@@ -216,7 +220,7 @@
public final int read() throws IOException {
synchronized(currentBucket) {
_maybeResetInputStream();
- int toReturn = is.read();
+ int toReturn = currentIS.read();
if(toReturn > -1)
index++;
return toReturn;
@@ -227,7 +231,7 @@
public long skip(long n) throws IOException {
synchronized(currentBucket) {
_maybeResetInputStream();
- return is.skip(n);
+ return currentIS.skip(n);
}
}
@@ -235,7 +239,7 @@
public int available() throws IOException {
synchronized(currentBucket) {
_maybeResetInputStream();
- return is.available();
+ return currentIS.available();
}
}
@@ -247,7 +251,7 @@
} catch (IOException e) {
Logger.error(this,
"IOE:"+e.getMessage(),e);
}
- is.mark(readlimit);
+ currentIS.mark(readlimit);
}
}
@@ -255,7 +259,7 @@
public void reset() throws IOException {
synchronized(currentBucket) {
_maybeResetInputStream();
- is.reset();
+ currentIS.reset();
}
}
@@ -267,7 +271,7 @@
} catch (IOException e) {
Logger.error(this,
"IOE:"+e.getMessage(),e);
}
- return is.markSupported();
+ return currentIS.markSupported();
}
}
@@ -275,7 +279,7 @@
public final void close() throws IOException {
synchronized(currentBucket) {
_maybeResetInputStream();
- Closer.close(is);
+ Closer.close(currentIS);
}
}
}