On Thursday 24 April 2008 17:19, nextgens at freenetproject.org wrote:
> Author: nextgens
> Date: 2008-04-24 16:19:07 +0000 (Thu, 24 Apr 2008)
> New Revision: 19544
> 
> Modified:
>    trunk/freenet/src/freenet/client/FECCodec.java
> Log:
> Hopefully fix #2287: Freenet uses too many fd's when starting a 2G+ insert, 
results in failure to persist
> 
> Now we leave it up to the GC ASAP

This isn't going to work. Bucket.getOutputStream() doesn't append, at least, 
that's always been my assumption. And Bucket.getInputStream() *ALWAYS* starts 
at zero. I expect this commit to seriously break things - did you test it?

It is conceivable that we'd run out of threads on a quad core system (192 fd's 
per encode/decode, 4 at once), but imho it's unlikely - if we're running out 
of fd's, most likely there's a leak, surely?
> 
> Modified: trunk/freenet/src/freenet/client/FECCodec.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/FECCodec.java    2008-04-24 13:43:50 UTC 
(rev 19543)
> +++ trunk/freenet/src/freenet/client/FECCodec.java    2008-04-24 16:19:07 UTC 
(rev 19544)
> @@ -7,7 +7,6 @@
>  import java.io.IOException;
>  import java.io.OutputStream;
>  import java.util.LinkedList;
> -import java.util.NoSuchElementException;
>  
>  import com.onionnetworks.fec.FECCode;
>  import com.onionnetworks.util.Buffer;
> @@ -20,6 +19,7 @@
>  import freenet.support.io.BucketTools;
>  import freenet.support.io.Closer;
>  import freenet.support.io.NativeThread;
> +import java.util.ArrayList;
>  
>  /**
>   * FEC (forward error correction) handler.
> @@ -97,12 +97,10 @@
>                       throw new IllegalArgumentException();
>               Buffer[] packets = new Buffer[k];
>               Bucket[] buckets = new Bucket[n];
> -             DataInputStream[] readers = new DataInputStream[n];
> -             OutputStream[] writers = new OutputStream[k];
>               int numberToDecode = 0; // can be less than n-k
> +             ArrayList existingReaders = new ArrayList();
> +             ArrayList existingWriters = new ArrayList();
>  
> -             try {
> -
>                       byte[] realBuffer = new byte[k * STRIPE_SIZE];
>  
>                       int[] packetIndexes = new int[k];
> @@ -114,15 +112,14 @@
>                       for(int i = 0; i < k; i++)
>                               packets[i] = new Buffer(realBuffer, i * 
> STRIPE_SIZE,
>                                       STRIPE_SIZE);
> -
> +                     
>                       for(int i = 0; i < dataBlockStatus.length; i++) {
>                               buckets[i] = dataBlockStatus[i].getData();
>                               if(buckets[i] == null) {
>                                       buckets[i] = bf.makeBucket(blockLength);
> -                                     writers[i] = 
> buckets[i].getOutputStream();
> +                                     existingWriters.add(new Integer(i));
>                                       if(logMINOR)
>                                               Logger.minor(this, "writers[" + 
> i + "] != null");
> -                                     readers[i] = null;
>                                       numberToDecode++;
>                               }
>                               else {
> @@ -137,17 +134,14 @@
>                                       }
>                                       if(logMINOR)
>                                               Logger.minor(this, "writers[" + 
> i + "] = null (already filled)");
> -                                     writers[i] = null;
> -                                     readers[i] = new 
> DataInputStream(buckets[i].getInputStream());
> +                                     existingReaders.add(new Integer(i));
>                                       packetIndexes[idx++] = i;
>                               }
>                       }
>                       for(int i = 0; i < checkBlockStatus.length; i++) {
>                               buckets[i + k] = checkBlockStatus[i].getData();
> -                             if(buckets[i + k] == null)
> -                                     readers[i + k] = null;
> -                             else {
> -                                     readers[i + k] = new 
> DataInputStream(buckets[i + k].getInputStream());
> +                             if(buckets[i + k] != null) {
> +                                     existingReaders.add(new Integer(i+k));
>                                       if(idx < k)
>                                               packetIndexes[idx++] = i + k;
>                               }
> @@ -160,15 +154,18 @@
>                               for(int i = 0; i < packetIndexes.length; i++)
>                                       Logger.minor(this, "[" + i + "] = " + 
> packetIndexes[i]);
>  
> -                     if(numberToDecode > 0)
> +                     if(numberToDecode > 0) {
>                               // Do the (striped) decode
>  
>                               for(int offset = 0; offset < blockLength; 
> offset += STRIPE_SIZE) {
>                                       // Read the data in first
>                                       for(int i = 0; i < k; i++) {
>                                               int x = packetIndexes[i];
> -                                             
> readers[x].readFully(realBuffer, i * STRIPE_SIZE,
> -                                                     STRIPE_SIZE);
> +                                             if(existingReaders.contains(new 
> Integer(x))) {
> +                                                     DataInputStream dis = 
> new 
DataInputStream(buckets[i].getInputStream());
> +                                                     
> dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> +                                                     Closer.close(dis);
> +                                             }
>                                       }
>                                       // Do the decode
>                                       // Not shuffled
> @@ -178,19 +175,16 @@
>                                       fec.decode(packets, disposableIndexes);
>                                       // packets now contains an array of 
> decoded blocks, in order
>                                       // Write the data out
> -                                     for(int i = 0; i < k; i++)
> -                                             if(writers[i] != null)
> -                                                     
> writers[i].write(realBuffer, i * STRIPE_SIZE,
> -                                                             STRIPE_SIZE);
> +                                     for(int i = 0; i < k; i++) {
> +                                             if(existingWriters.contains(new 
> Integer(i))) {
> +                                                     OutputStream dos = 
> buckets[i].getOutputStream();
> +                                                     dos.write(realBuffer, i 
> * STRIPE_SIZE, STRIPE_SIZE);
> +                                             }
> +                                     }
>                               }
> +                     }
>  
> -             }
> -             finally {
> -                     for(int i = 0; i < k; i++)
> -                             Closer.close(writers[i]);
> -                     for(int i = 0; i < n; i++)
> -                             Closer.close(readers[i]);
> -             }
> +                                     
>               // Set new buckets only after have a successful decode.
>               // Note that the last data bucket will be overwritten padded.
>               for(int i = 0; i < dataBlockStatus.length; i++) {
> @@ -222,11 +216,7 @@
>               Buffer[] dataPackets = new Buffer[k];
>               Buffer[] checkPackets = new Buffer[n - k];
>               Bucket[] buckets = new Bucket[n];
> -             DataInputStream[] readers = new DataInputStream[k];
> -             OutputStream[] writers = new OutputStream[n - k];
>  
> -             try {
> -
>                       int[] toEncode = new int[n - k];
>                       int numberToEncode = 0; // can be less than n-k
>  
> @@ -250,20 +240,18 @@
>                                       else
>                                               throw new 
> IllegalArgumentException("Too big: " + sz + " bigger than " 
+ blockLength);
>                               }
> -                             readers[i] = new 
> DataInputStream(buckets[i].getInputStream());
>                       }
> -
> +                     
> +                     ArrayList existingWriters = new ArrayList();
>                       for(int i = 0; i < checkBlockStatus.length; i++) {
>                               buckets[i + k] = checkBlockStatus[i];
>                               if(buckets[i + k] == null) {
>                                       buckets[i + k] = 
> bf.makeBucket(blockLength);
> -                                     writers[i] = buckets[i + 
> k].getOutputStream();
>                                       toEncode[numberToEncode++] = i + k;
> -                             }
> -                             else
> -                                     writers[i] = null;
> +                                     existingWriters.add(new Integer(i));
> +                             }       
>                       }
> -
> +                     
>                       //                      Runtime.getRuntime().gc();
>  //                   Runtime.getRuntime().runFinalization();
>  //                   Runtime.getRuntime().gc();
> @@ -280,9 +268,11 @@
>                                       if(logMINOR)
>                                               Logger.minor(this, "Memory in 
> use before read: " + 
memUsedBeforeRead);
>                                       // Read the data in first
> -                                     for(int i = 0; i < k; i++)
> -                                             
> readers[i].readFully(realBuffer, i * STRIPE_SIZE,
> -                                                     STRIPE_SIZE);
> +                                     for(int i = 0; i < k; i++) {
> +                                             DataInputStream dis = new 
DataInputStream(buckets[i].getInputStream());
> +                                             dis.readFully(realBuffer, i * 
> STRIPE_SIZE, STRIPE_SIZE);
> +                                             Closer.close(dis);
> +                                     }
>                                       // Do the encode
>                                       // Not shuffled
>                                       long startTime = 
> System.currentTimeMillis();
> @@ -306,19 +296,18 @@
>                                               Logger.minor(this, "Stripe 
> encode took " + (endTime - startTime) 
+ "ms for k=" + k + ", n=" + n + ", stripeSize=" + STRIPE_SIZE);
>                                       // packets now contains an array of 
> decoded blocks, in order
>                                       // Write the data out
> -                                     for(int i = k; i < n; i++)
> -                                             if(writers[i - k] != null)
> -                                                     writers[i - 
> k].write(realBuffer, i * STRIPE_SIZE,
> -                                                             STRIPE_SIZE);
> +                                     
> +                                     for(int i = k; i < n; i++) {
> +                                             int index = i-k;
> +                                             
> +                                             if(existingWriters.contains(new 
> Integer(index))) {
> +                                                     OutputStream os = 
> buckets[i].getOutputStream();
> +                                                     os.write(realBuffer, i 
> * STRIPE_SIZE, STRIPE_SIZE);
> +                                                     Closer.close(os);
> +                                             }
> +                                     }
>                               }
>  
> -             }
> -             finally {
> -                     for(int i = 0; i < k; i++)
> -                             Closer.close(readers[i]);
> -                     for(int i = 0; i < n - k; i++)
> -                             Closer.close(writers[i]);
> -             }
>               // Set new buckets only after have a successful decode.
>               for(int i = 0; i < checkBlockStatus.length; i++) {
>                       Bucket data = buckets[i + k];
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20080425/035de1ef/attachment.pgp>

Reply via email to