Author: nextgens
Date: 2008-08-25 22:16:26 +0000 (Mon, 25 Aug 2008)
New Revision: 22158
Modified:
trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucket: attempt to solve the concurrency issues with multiple open streams
Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
22:03:54 UTC (rev 22157)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25
22:16:26 UTC (rev 22158)
@@ -65,6 +65,7 @@
private long currentSize;
private OutputStream os = null;
private InputStream is = null;
+ private short osIndex;
private volatile boolean shouldResetOS = false;
private volatile boolean shouldResetIS = false;
public final Object sync = new Object();
@@ -75,6 +76,7 @@
throw new NullPointerException();
this.currentBucket = cur;
this.creationTime = now;
+ this.osIndex = 0;
}
/** A blocking method to force-migrate from a RAMBucket to a
FileBucket */
@@ -115,14 +117,22 @@
public OutputStream getOutputStream() throws IOException {
synchronized(sync) {
shouldResetOS = true;
- return new TempBucketOutputStream();
+ return new TempBucketOutputStream(++osIndex);
}
}
private class TempBucketOutputStream extends OutputStream {
private OutputStream currentOS;
+ private final short idx;
+ TempBucketOutputStream(short idx) {
+ this.idx = idx;
+ }
+
private void _maybeMigrateRamBucket(long futureSize)
throws IOException {
+ if(idx != osIndex)
+ throw new IOException("Should use the
new OutputStream!");
+
if(isRAMBucket()) {
boolean shouldMigrate = false;
boolean isOversized = false;
@@ -204,15 +214,23 @@
public synchronized InputStream getInputStream() throws
IOException {
shouldResetIS = true;
- return new TempBucketInputStream();
+ return new TempBucketInputStream(osIndex);
}
private class TempBucketInputStream extends InputStream {
private InputStream currentIS;
private long index = 0;
private long mark = 0;
+ private final short idx;
+ TempBucketInputStream(short idx) {
+ this.idx = idx;
+ }
+
private void _maybeResetInputStream() throws
IOException {
+ if(idx != osIndex)
+ throw new IOException("Should use the
new InputStream!");
+
if(shouldResetIS) {
Closer.close(currentIS);
currentIS = (is == null ?
currentBucket.getInputStream() : is);