Author: nextgens
Date: 2008-08-26 15:14:18 +0000 (Tue, 26 Aug 2008)
New Revision: 22178

Modified:
   trunk/freenet/src/freenet/support/io/TempBucketFactory.java
Log:
TempBucketFactory: keep track of all our open InputStreams so that we can close 
them before we free()... and we spare a few cpu cycles on each access as we can 
force-reset them when we migrate buckets

Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-26 
14:24:45 UTC (rev 22177)
+++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java 2008-08-26 
15:14:18 UTC (rev 22178)
@@ -18,6 +18,7 @@
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Vector;
 import java.util.concurrent.LinkedBlockingQueue;

 /**
@@ -67,6 +68,8 @@
                private long currentSize;
                /** A link to the "real" underlying outputStream, even if we 
migrated */
                private OutputStream os = null;
+               /** All the open-streams to reset or close on migration or 
free() */
+               private final Vector<TempBucketInputStream> tbis;
                /** An identifier used to know when to deprecate the 
InputStreams */
                private short osIndex;
                /** A timestamp used to evaluate the age of the bucket and 
maybe consider it for a migration */
@@ -78,8 +81,22 @@
                        this.currentBucket = cur;
                        this.creationTime = now;
                        this.osIndex = 0;
+                       this.tbis = new Vector<TempBucketInputStream>();
                }

+               private void closeInputStreams(boolean forFree) {
+                       for(TempBucketInputStream is : tbis) {
+                               try {
+                                       if(forFree)
+                                               is.close();
+                                       else
+                                               is._maybeResetInputStream();
+                               } catch(IOException e) {
+                                       tbis.remove(is);
+                               }
+                       }
+               }
+               
                /** A blocking method to force-migrate from a RAMBucket to a 
FileBucket */
                private final void migrateToFileBucket() throws IOException {
                        Bucket toMigrate = null;
@@ -100,6 +117,9 @@
                                        BucketTools.copyTo(toMigrate, os, 
currentSize);
                                if(toMigrate.isReadOnly())
                                        tempFB.setReadOnly();
+                               
+                               closeInputStreams(false);
+                               
                                currentBucket = tempFB;
                                // We need streams to be reset to point to the 
new bucket
                        }
@@ -199,7 +219,10 @@
                public synchronized InputStream getInputStream() throws 
IOException {
                        if(os == null)
                                throw new IOException("No OutputStream has been 
openned! Why would you want an InputStream then?");
-                       return new TempBucketInputStream(osIndex);
+                       TempBucketInputStream is = new 
TempBucketInputStream(osIndex);
+                       tbis.add(is);
+                       
+                       return is;
                }

                private class TempBucketInputStream extends InputStream {
@@ -218,7 +241,7 @@
                                this.currentOS = os;
                        }

-                       private void _maybeResetInputStream() throws 
IOException {
+                       public void _maybeResetInputStream() throws IOException 
{
                                if(idx != osIndex)
                                        throw new IOException("Should use the 
new InputStream!");

@@ -233,7 +256,6 @@
                        @Override
                        public final int read() throws IOException {
                                synchronized(TempBucket.this) {
-                                       _maybeResetInputStream();
                                        int toReturn = currentIS.read();
                                        if(toReturn != -1)
                                                index++;
@@ -251,7 +273,6 @@
                        @Override
                        public int read(byte b[], int off, int len) throws 
IOException {
                                synchronized(TempBucket.this) {
-                                       _maybeResetInputStream();
                                        int toReturn = currentIS.read(b, off, 
len);
                                        if(toReturn > 0)
                                                index += toReturn;
@@ -262,7 +283,6 @@
                        @Override
                        public long skip(long n) throws IOException {
                                synchronized(TempBucket.this) {
-                                       _maybeResetInputStream();
                                        long skipped = currentIS.skip(n);
                                        index += skipped;
                                        return skipped;
@@ -277,8 +297,8 @@
                        @Override
                        public final void close() throws IOException {
                                synchronized(TempBucket.this) {
-                                       _maybeResetInputStream();
                                        Closer.close(currentIS);
+                                       tbis.remove(this);
                                }
                        }
                }
@@ -300,6 +320,8 @@
                }

                public synchronized void free() {
+                       closeInputStreams(true);
+                       Closer.close(os);
                        currentBucket.free();
                        if(isRAMBucket())
                                _hasFreed(currentSize);


Reply via email to