Author: nextgens
Date: 2008-08-25 21:27:01 +0000 (Mon, 25 Aug 2008)
New Revision: 22149

Modified:
   trunk/freenet/src/freenet/support/io/BucketTools.java
   trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBuckets: maybe fix the problem with OutputStreams

Modified: trunk/freenet/src/freenet/support/io/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/io/BucketTools.java       2008-08-25 
20:41:10 UTC (rev 22148)
+++ trunk/freenet/src/freenet/support/io/BucketTools.java       2008-08-25 
21:27:01 UTC (rev 22149)
@@ -294,6 +294,7 @@
                        }
                } finally {
                        is.close();
+                       os.flush();
                }
        }


Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
20:41:10 UTC (rev 22148)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-25 
21:27:01 UTC (rev 22149)
@@ -63,6 +63,8 @@
        public class TempBucket implements Bucket {
                private Bucket currentBucket;
                private long currentSize;
+               private OutputStream os = null;
+               private InputStream is = null;
                private volatile boolean shouldResetOS = false;
                private volatile boolean shouldResetIS = false;
                public final long creationTime;
@@ -84,7 +86,9 @@

                                toMigrate = currentBucket;
                                Bucket tempFB = _makeFileBucket();
-                               BucketTools.copy(currentBucket, tempFB);
+                               os = tempFB.getOutputStream();
+                               is = tempFB.getInputStream();
+                               BucketTools.copyTo(tempFB, os, currentSize);
                                if(toMigrate.isReadOnly())
                                        tempFB.setReadOnly();
                                currentBucket = tempFB;
@@ -115,7 +119,7 @@
                }

                private class TempBucketOutputStream extends OutputStream {
-                       private OutputStream os;
+                       private OutputStream currentOS;

                        private void _maybeMigrateRamBucket(long futureSize) 
throws IOException {
                                if(isRAMBucket()) {
@@ -129,7 +133,7 @@
                                                shouldMigrate = true;

                                        if(shouldMigrate) {
-                                               Closer.close(os);
+                                               Closer.close(currentOS);
                                                if(logMINOR) {
                                                        if(isOversized)
                                                                
Logger.minor(this, "The bucket is over 
"+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+": we will 
force-migrate it to disk.");
@@ -143,8 +147,8 @@

                        private void _maybeResetOutputStream() throws 
IOException {
                                if(shouldResetOS) {
-                                       Closer.close(os);
-                                       os = currentBucket.getOutputStream();
+                                       Closer.close(currentOS);
+                                       currentOS = (os == null ? 
currentBucket.getOutputStream() : os);
                                        shouldResetOS = false;
                                }
                        }
@@ -155,7 +159,7 @@
                                        long futurSize = currentSize + 1;
                                        _maybeMigrateRamBucket(futurSize);
                                        _maybeResetOutputStream();
-                                       os.write(b);
+                                       currentOS.write(b);
                                        currentSize = futurSize;
                                        if(isRAMBucket()) // We need to 
re-check because it might have changed!
                                                _hasTaken(1);
@@ -168,7 +172,7 @@
                                        long futurSize = currentSize + len;
                                        _maybeMigrateRamBucket(futurSize);
                                        _maybeResetOutputStream();
-                                       os.write(b, off, len);
+                                       currentOS.write(b, off, len);
                                        currentSize = futurSize;
                                        if(isRAMBucket()) // We need to 
re-check because it might have changed!
                                                _hasTaken(len);
@@ -180,7 +184,7 @@
                                synchronized(currentBucket) {
                                        _maybeMigrateRamBucket(currentSize);
                                        _maybeResetOutputStream();
-                                       os.flush();
+                                       currentOS.flush();
                                }
                        }

@@ -189,7 +193,7 @@
                                synchronized(currentBucket) {
                                        _maybeMigrateRamBucket(currentSize);
                                        _maybeResetOutputStream();
-                                       Closer.close(os);
+                                       Closer.close(currentOS);
                                }
                        }
                }
@@ -200,14 +204,14 @@
                }

                private class TempBucketInputStream extends InputStream {
-                       private InputStream is;
+                       private InputStream currentIS;
                        private long index = 0;

                        private void _maybeResetInputStream() throws 
IOException {
                                if(shouldResetIS) {
-                                       Closer.close(is);
-                                       is = currentBucket.getInputStream();
-                                       is.skip(index);
+                                       Closer.close(currentIS);
+                                       currentIS = (is == null ? 
currentBucket.getInputStream() : is);
+                                       currentIS.skip(index);
                                        shouldResetIS = false;
                                }
                        }
@@ -216,7 +220,7 @@
                        public final int read() throws IOException {
                                synchronized(currentBucket) {
                                        _maybeResetInputStream();
-                                       int toReturn = is.read();
+                                       int toReturn = currentIS.read();
                                        if(toReturn > -1)
                                                index++;
                                        return toReturn;
@@ -227,7 +231,7 @@
                        public long skip(long n) throws IOException {
                                synchronized(currentBucket) {
                                        _maybeResetInputStream();
-                                       return is.skip(n);
+                                       return currentIS.skip(n);
                                }
                        }

@@ -235,7 +239,7 @@
                        public int available() throws IOException {
                                synchronized(currentBucket) {
                                        _maybeResetInputStream();
-                                       return is.available();
+                                       return currentIS.available();
                                }
                        }

@@ -247,7 +251,7 @@
                                        } catch (IOException e) {
                                                Logger.error(this, 
"IOE:"+e.getMessage(),e);
                                        }
-                                       is.mark(readlimit);
+                                       currentIS.mark(readlimit);
                                }
                        }

@@ -255,7 +259,7 @@
                        public void reset() throws IOException {
                                synchronized(currentBucket) {
                                        _maybeResetInputStream();
-                                       is.reset();
+                                       currentIS.reset();
                                }                               
                        }

@@ -267,7 +271,7 @@
                                        } catch (IOException e) {
                                                Logger.error(this, 
"IOE:"+e.getMessage(),e);
                                        }
-                                       return is.markSupported();
+                                       return currentIS.markSupported();
                                }
                        }

@@ -275,7 +279,7 @@
                        public final void close() throws IOException {
                                synchronized(currentBucket) {
                                        _maybeResetInputStream();
-                                       Closer.close(is);
+                                       Closer.close(currentIS);
                                }
                        }
                }


Reply via email to