Author: nextgens
Date: 2008-08-25 23:06:22 +0000 (Mon, 25 Aug 2008)
New Revision: 22164

Modified:
   trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucket: simplify the code even more... but it's still not working as 
expected!

Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
22:45:37 UTC (rev 22163)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
23:06:22 UTC (rev 22164)
@@ -65,8 +65,8 @@
                private long currentSize;
                private OutputStream os = null;
                private short osIndex;
-               private volatile boolean shouldResetOS = false;
-               private volatile boolean shouldResetIS = false;
+               private short shouldResetIS = 0;
+               private short isIndex;
                public final Object sync = new Object();
                public final long creationTime;

@@ -76,6 +76,7 @@
                        this.currentBucket = cur;
                        this.creationTime = now;
                        this.osIndex = 0;
+                       this.isIndex = 0;
                }

                /** A blocking method to force-migrate from a RAMBucket to a 
FileBucket */
@@ -88,14 +89,17 @@

                                toMigrate = currentBucket;
                                Bucket tempFB = _makeFileBucket();
+                               if(os != null) {
+                                       os.flush();
+                                       os.close();
+                               }
                                os = tempFB.getOutputStream();
                                BucketTools.copyTo(tempFB, os, currentSize);
                                if(toMigrate.isReadOnly())
                                        tempFB.setReadOnly();
                                currentBucket = tempFB;
                                // We need streams to be reset to point to the 
new bucket
-                               shouldResetOS = true;
-                               shouldResetIS = true;
+                               shouldResetIS = isIndex;
                        }
                        if(logMINOR)
                                Logger.minor(this, "We have migrated 
"+toMigrate.hashCode());
@@ -114,23 +118,21 @@

                public OutputStream getOutputStream() throws IOException {
                        synchronized(sync) {
-                               shouldResetOS = true;
+                               if(os != null)
+                                       throw new IOException("Only one 
OutputStream per bucket!");
                                return new TempBucketOutputStream(++osIndex);
                        }
                }

                private class TempBucketOutputStream extends OutputStream {
-                       private OutputStream currentOS;
                        private final short idx;

-                       TempBucketOutputStream(short idx) {
+                       TempBucketOutputStream(short idx) throws IOException {
                                this.idx = idx;
+                               os = currentBucket.getOutputStream();
                        }

                        private void _maybeMigrateRamBucket(long futureSize) 
throws IOException {
-                               if(idx != osIndex)
-                                       throw new IOException("Should use the 
new OutputStream!");
-                               
                                if(isRAMBucket()) {
                                        boolean shouldMigrate = false;
                                        boolean isOversized = false;
@@ -142,7 +144,6 @@
                                                shouldMigrate = true;

                                        if(shouldMigrate) {
-                                               Closer.close(currentOS);
                                                if(logMINOR) {
                                                        if(isOversized)
                                                                
Logger.minor(this, "The bucket is over 
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will 
force-migrate it to disk.");
@@ -154,22 +155,13 @@
                                }
                        }

-                       private void _maybeResetOutputStream() throws 
IOException {
-                               if(shouldResetOS) {
-                                       Closer.close(currentOS);
-                                       currentOS = (os == null ? 
currentBucket.getOutputStream() : os);
-                                       shouldResetOS = false;
-                               }
-                       }
-                       
                        @Override
                        public final void write(int b) throws IOException {
                                synchronized(sync) {
-                                       long futurSize = currentSize + 1;
-                                       _maybeMigrateRamBucket(futurSize);
-                                       _maybeResetOutputStream();
-                                       currentOS.write(b);
-                                       currentSize = futurSize;
+                                       long futureSize = currentSize + 1;
+                                       _maybeMigrateRamBucket(futureSize);
+                                       os.write(b);
+                                       currentSize = futureSize;
                                        if(isRAMBucket()) // We need to 
re-check because it might have changed!
                                                _hasTaken(1);
                                }
@@ -180,8 +172,7 @@
                                synchronized(sync) {
                                        long futureSize = currentSize + len;
                                        _maybeMigrateRamBucket(futureSize);
-                                       _maybeResetOutputStream();
-                                       currentOS.write(b, off, len);
+                                       os.write(b, off, len);
                                        currentSize = futureSize;
                                        if(isRAMBucket()) // We need to 
re-check because it might have changed!
                                                _hasTaken(len);
@@ -192,8 +183,7 @@
                        public final void flush() throws IOException {
                                synchronized(sync) {
                                        _maybeMigrateRamBucket(currentSize);
-                                       _maybeResetOutputStream();
-                                       currentOS.flush();
+                                       os.flush();
                                }
                        }

@@ -201,17 +191,16 @@
                        public final void close() throws IOException {
                                synchronized(sync) {
                                        _maybeMigrateRamBucket(currentSize);
-                                       _maybeResetOutputStream();
-                                       if(currentOS != null) {
-                                               currentOS.flush();
-                                               Closer.close(currentOS);
+                                       if(os != null) {
+                                               os.flush();
+                                               os.close();
                                        }
                                }
                        }
                }

                public synchronized InputStream getInputStream() throws 
IOException {
-                       shouldResetIS = true;
+                       isIndex++;
                        return new TempBucketInputStream(osIndex);
                }

@@ -220,19 +209,20 @@
                        private long index = 0;
                        private final short idx;

-                       TempBucketInputStream(short idx) {
+                       TempBucketInputStream(short idx) throws IOException {
                                this.idx = idx;
+                               this.currentIS = currentBucket.getInputStream();
                        }

                        private void _maybeResetInputStream() throws 
IOException {
                                if(idx != osIndex)
                                        throw new IOException("Should use the 
new InputStream!");

-                               if(shouldResetIS) {
+                               if(shouldResetIS > 0) {
                                        Closer.close(currentIS);
                                        currentIS = 
currentBucket.getInputStream();
                                        currentIS.skip(index);
-                                       shouldResetIS = false;
+                                       shouldResetIS--;
                                }
                        }

@@ -283,6 +273,7 @@
                                synchronized(sync) {
                                        _maybeResetInputStream();
                                        Closer.close(currentIS);
+                                       isIndex--;
                                }
                        }
                }


Reply via email to