Author: nextgens
Date: 2008-04-24 16:19:07 +0000 (Thu, 24 Apr 2008)
New Revision: 19544

Modified:
   trunk/freenet/src/freenet/client/FECCodec.java
Log:
Hopefully fix #2287: Freenet uses too many fd's when starting a 2G+ insert, 
results in failure to persist

Now we leave it up to the GC ASAP

Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2008-04-24 13:43:50 UTC 
(rev 19543)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2008-04-24 16:19:07 UTC 
(rev 19544)
@@ -7,7 +7,6 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.LinkedList;
-import java.util.NoSuchElementException;

 import com.onionnetworks.fec.FECCode;
 import com.onionnetworks.util.Buffer;
@@ -20,6 +19,7 @@
 import freenet.support.io.BucketTools;
 import freenet.support.io.Closer;
 import freenet.support.io.NativeThread;
+import java.util.ArrayList;

 /**
  * FEC (forward error correction) handler.
@@ -97,12 +97,10 @@
                        throw new IllegalArgumentException();
                Buffer[] packets = new Buffer[k];
                Bucket[] buckets = new Bucket[n];
-               DataInputStream[] readers = new DataInputStream[n];
-               OutputStream[] writers = new OutputStream[k];
                int numberToDecode = 0; // can be less than n-k
+               ArrayList existingReaders = new ArrayList();
+               ArrayList existingWriters = new ArrayList();

-               try {
-
                        byte[] realBuffer = new byte[k * STRIPE_SIZE];

                        int[] packetIndexes = new int[k];
@@ -114,15 +112,14 @@
                        for(int i = 0; i < k; i++)
                                packets[i] = new Buffer(realBuffer, i * 
STRIPE_SIZE,
                                        STRIPE_SIZE);
-
+                       
                        for(int i = 0; i < dataBlockStatus.length; i++) {
                                buckets[i] = dataBlockStatus[i].getData();
                                if(buckets[i] == null) {
                                        buckets[i] = bf.makeBucket(blockLength);
-                                       writers[i] = 
buckets[i].getOutputStream();
+                                       existingWriters.add(new Integer(i));
                                        if(logMINOR)
                                                Logger.minor(this, "writers[" + 
i + "] != null");
-                                       readers[i] = null;
                                        numberToDecode++;
                                }
                                else {
@@ -137,17 +134,14 @@
                                        }
                                        if(logMINOR)
                                                Logger.minor(this, "writers[" + 
i + "] = null (already filled)");
-                                       writers[i] = null;
-                                       readers[i] = new 
DataInputStream(buckets[i].getInputStream());
+                                       existingReaders.add(new Integer(i));
                                        packetIndexes[idx++] = i;
                                }
                        }
                        for(int i = 0; i < checkBlockStatus.length; i++) {
                                buckets[i + k] = checkBlockStatus[i].getData();
-                               if(buckets[i + k] == null)
-                                       readers[i + k] = null;
-                               else {
-                                       readers[i + k] = new 
DataInputStream(buckets[i + k].getInputStream());
+                               if(buckets[i + k] != null) {
+                                       existingReaders.add(new Integer(i+k));
                                        if(idx < k)
                                                packetIndexes[idx++] = i + k;
                                }
@@ -160,15 +154,18 @@
                                for(int i = 0; i < packetIndexes.length; i++)
                                        Logger.minor(this, "[" + i + "] = " + 
packetIndexes[i]);

-                       if(numberToDecode > 0)
+                       if(numberToDecode > 0) {
                                // Do the (striped) decode

                                for(int offset = 0; offset < blockLength; 
offset += STRIPE_SIZE) {
                                        // Read the data in first
                                        for(int i = 0; i < k; i++) {
                                                int x = packetIndexes[i];
-                                               
readers[x].readFully(realBuffer, i * STRIPE_SIZE,
-                                                       STRIPE_SIZE);
+                                               if(existingReaders.contains(new 
Integer(x))) {
+                                                       DataInputStream dis = 
new DataInputStream(buckets[i].getInputStream());
+                                                       
dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
+                                                       Closer.close(dis);
+                                               }
                                        }
                                        // Do the decode
                                        // Not shuffled
@@ -178,19 +175,16 @@
                                        fec.decode(packets, disposableIndexes);
                                        // packets now contains an array of 
decoded blocks, in order
                                        // Write the data out
-                                       for(int i = 0; i < k; i++)
-                                               if(writers[i] != null)
-                                                       
writers[i].write(realBuffer, i * STRIPE_SIZE,
-                                                               STRIPE_SIZE);
+                                       for(int i = 0; i < k; i++) {
+                                               if(existingWriters.contains(new 
Integer(i))) {
+                                                       OutputStream dos = 
buckets[i].getOutputStream();
+                                                       dos.write(realBuffer, i 
* STRIPE_SIZE, STRIPE_SIZE);
+                                               }
+                                       }
                                }
+                       }

-               }
-               finally {
-                       for(int i = 0; i < k; i++)
-                               Closer.close(writers[i]);
-                       for(int i = 0; i < n; i++)
-                               Closer.close(readers[i]);
-               }
+                                       
                // Set new buckets only after have a successful decode.
                // Note that the last data bucket will be overwritten padded.
                for(int i = 0; i < dataBlockStatus.length; i++) {
@@ -222,11 +216,7 @@
                Buffer[] dataPackets = new Buffer[k];
                Buffer[] checkPackets = new Buffer[n - k];
                Bucket[] buckets = new Bucket[n];
-               DataInputStream[] readers = new DataInputStream[k];
-               OutputStream[] writers = new OutputStream[n - k];

-               try {
-
                        int[] toEncode = new int[n - k];
                        int numberToEncode = 0; // can be less than n-k

@@ -250,20 +240,18 @@
                                        else
                                                throw new 
IllegalArgumentException("Too big: " + sz + " bigger than " + blockLength);
                                }
-                               readers[i] = new 
DataInputStream(buckets[i].getInputStream());
                        }
-
+                       
+                       ArrayList existingWriters = new ArrayList();
                        for(int i = 0; i < checkBlockStatus.length; i++) {
                                buckets[i + k] = checkBlockStatus[i];
                                if(buckets[i + k] == null) {
                                        buckets[i + k] = 
bf.makeBucket(blockLength);
-                                       writers[i] = buckets[i + 
k].getOutputStream();
                                        toEncode[numberToEncode++] = i + k;
-                               }
-                               else
-                                       writers[i] = null;
+                                       existingWriters.add(new Integer(i));
+                               }       
                        }
-
+                       
                        //                      Runtime.getRuntime().gc();
 //                     Runtime.getRuntime().runFinalization();
 //                     Runtime.getRuntime().gc();
@@ -280,9 +268,11 @@
                                        if(logMINOR)
                                                Logger.minor(this, "Memory in 
use before read: " + memUsedBeforeRead);
                                        // Read the data in first
-                                       for(int i = 0; i < k; i++)
-                                               
readers[i].readFully(realBuffer, i * STRIPE_SIZE,
-                                                       STRIPE_SIZE);
+                                       for(int i = 0; i < k; i++) {
+                                               DataInputStream dis = new 
DataInputStream(buckets[i].getInputStream());
+                                               dis.readFully(realBuffer, i * 
STRIPE_SIZE, STRIPE_SIZE);
+                                               Closer.close(dis);
+                                       }
                                        // Do the encode
                                        // Not shuffled
                                        long startTime = 
System.currentTimeMillis();
@@ -306,19 +296,18 @@
                                                Logger.minor(this, "Stripe 
encode took " + (endTime - startTime) + "ms for k=" + k + ", n=" + n + ", 
stripeSize=" + STRIPE_SIZE);
                                        // packets now contains an array of 
decoded blocks, in order
                                        // Write the data out
-                                       for(int i = k; i < n; i++)
-                                               if(writers[i - k] != null)
-                                                       writers[i - 
k].write(realBuffer, i * STRIPE_SIZE,
-                                                               STRIPE_SIZE);
+                                       
+                                       for(int i = k; i < n; i++) {
+                                               int index = i-k;
+                                               
+                                               if(existingWriters.contains(new 
Integer(index))) {
+                                                       OutputStream os = 
buckets[i].getOutputStream();
+                                                       os.write(realBuffer, i 
* STRIPE_SIZE, STRIPE_SIZE);
+                                                       Closer.close(os);
+                                               }
+                                       }
                                }

-               }
-               finally {
-                       for(int i = 0; i < k; i++)
-                               Closer.close(readers[i]);
-                       for(int i = 0; i < n - k; i++)
-                               Closer.close(writers[i]);
-               }
                // Set new buckets only after have a successful decode.
                for(int i = 0; i < checkBlockStatus.length; i++) {
                        Bucket data = buckets[i + k];


Reply via email to