Author: nextgens
Date: 2008-08-25 22:16:26 +0000 (Mon, 25 Aug 2008)
New Revision: 22158

Modified:
   trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucket: attempt to solve the concurrency issues with multiple open streams

Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
22:03:54 UTC (rev 22157)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
22:16:26 UTC (rev 22158)
@@ -65,6 +65,7 @@
                private long currentSize;
                private OutputStream os = null;
                private InputStream is = null;
+               private short osIndex;
                private volatile boolean shouldResetOS = false;
                private volatile boolean shouldResetIS = false;
                public final Object sync = new Object();
@@ -75,6 +76,7 @@
                                throw new NullPointerException();
                        this.currentBucket = cur;
                        this.creationTime = now;
+                       this.osIndex = 0;
                }

                /** A blocking method to force-migrate from a RAMBucket to a 
FileBucket */
@@ -115,14 +117,22 @@
                public OutputStream getOutputStream() throws IOException {
                        synchronized(sync) {
                                shouldResetOS = true;
-                               return new TempBucketOutputStream();
+                               return new TempBucketOutputStream(++osIndex);
                        }
                }

                private class TempBucketOutputStream extends OutputStream {
                        private OutputStream currentOS;
+                       private final short idx;

+                       TempBucketOutputStream(short idx) {
+                               this.idx = idx;
+                       }
+                       
                        private void _maybeMigrateRamBucket(long futureSize) 
throws IOException {
+                               if(idx != osIndex)
+                                       throw new IOException("Should use the 
new OutputStream!");
+                               
                                if(isRAMBucket()) {
                                        boolean shouldMigrate = false;
                                        boolean isOversized = false;
@@ -204,15 +214,23 @@

                public synchronized InputStream getInputStream() throws 
IOException {
                        shouldResetIS = true;
-                       return new TempBucketInputStream();
+                       return new TempBucketInputStream(osIndex);
                }

                private class TempBucketInputStream extends InputStream {
                        private InputStream currentIS;
                        private long index = 0;
                        private long mark = 0;
+                       private final short idx;

+                       TempBucketInputStream(short idx) {
+                               this.idx = idx;
+                       }
+                       
                        private void _maybeResetInputStream() throws 
IOException {
+                               if(idx != osIndex)
+                                       throw new IOException("Should use the 
new InputStream!");
+                               
                                if(shouldResetIS) {
                                        Closer.close(currentIS);
                                        currentIS = (is == null ? 
currentBucket.getInputStream() : is);


Reply via email to