On Thursday 24 April 2008 17:19, nextgens at freenetproject.org wrote:
> Author: nextgens
> Date: 2008-04-24 16:19:07 +0000 (Thu, 24 Apr 2008)
> New Revision: 19544
>
> Modified:
> trunk/freenet/src/freenet/client/FECCodec.java
> Log:
> Hopefully fix #2287: Freenet uses too many fd's when starting a 2G+ insert,
results in failure to persist
>
> Now we leave it up to the GC ASAP
This isn't going to work. Bucket.getOutputStream() doesn't append, at least,
that's always been my assumption. And Bucket.getInputStream() *ALWAYS* starts
at zero. I expect this commit to seriously break things - did you test it?
It is conceivable that we'd run out of threads on a quad core system (192 fd's
per encode/decode, 4 at once), but imho it's unlikely - if we're running out
of fd's, most likely there's a leak, surely?
>
> Modified: trunk/freenet/src/freenet/client/FECCodec.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/FECCodec.java 2008-04-24 13:43:50 UTC
(rev 19543)
> +++ trunk/freenet/src/freenet/client/FECCodec.java 2008-04-24 16:19:07 UTC
(rev 19544)
> @@ -7,7 +7,6 @@
> import java.io.IOException;
> import java.io.OutputStream;
> import java.util.LinkedList;
> -import java.util.NoSuchElementException;
>
> import com.onionnetworks.fec.FECCode;
> import com.onionnetworks.util.Buffer;
> @@ -20,6 +19,7 @@
> import freenet.support.io.BucketTools;
> import freenet.support.io.Closer;
> import freenet.support.io.NativeThread;
> +import java.util.ArrayList;
>
> /**
> * FEC (forward error correction) handler.
> @@ -97,12 +97,10 @@
> throw new IllegalArgumentException();
> Buffer[] packets = new Buffer[k];
> Bucket[] buckets = new Bucket[n];
> - DataInputStream[] readers = new DataInputStream[n];
> - OutputStream[] writers = new OutputStream[k];
> int numberToDecode = 0; // can be less than n-k
> + ArrayList existingReaders = new ArrayList();
> + ArrayList existingWriters = new ArrayList();
>
> - try {
> -
> byte[] realBuffer = new byte[k * STRIPE_SIZE];
>
> int[] packetIndexes = new int[k];
> @@ -114,15 +112,14 @@
> for(int i = 0; i < k; i++)
> packets[i] = new Buffer(realBuffer, i *
> STRIPE_SIZE,
> STRIPE_SIZE);
> -
> +
> for(int i = 0; i < dataBlockStatus.length; i++) {
> buckets[i] = dataBlockStatus[i].getData();
> if(buckets[i] == null) {
> buckets[i] = bf.makeBucket(blockLength);
> - writers[i] =
> buckets[i].getOutputStream();
> + existingWriters.add(new Integer(i));
> if(logMINOR)
> Logger.minor(this, "writers[" +
> i + "] != null");
> - readers[i] = null;
> numberToDecode++;
> }
> else {
> @@ -137,17 +134,14 @@
> }
> if(logMINOR)
> Logger.minor(this, "writers[" +
> i + "] = null (already filled)");
> - writers[i] = null;
> - readers[i] = new
> DataInputStream(buckets[i].getInputStream());
> + existingReaders.add(new Integer(i));
> packetIndexes[idx++] = i;
> }
> }
> for(int i = 0; i < checkBlockStatus.length; i++) {
> buckets[i + k] = checkBlockStatus[i].getData();
> - if(buckets[i + k] == null)
> - readers[i + k] = null;
> - else {
> - readers[i + k] = new
> DataInputStream(buckets[i + k].getInputStream());
> + if(buckets[i + k] != null) {
> + existingReaders.add(new Integer(i+k));
> if(idx < k)
> packetIndexes[idx++] = i + k;
> }
> @@ -160,15 +154,18 @@
> for(int i = 0; i < packetIndexes.length; i++)
> Logger.minor(this, "[" + i + "] = " +
> packetIndexes[i]);
>
> - if(numberToDecode > 0)
> + if(numberToDecode > 0) {
> // Do the (striped) decode
>
> for(int offset = 0; offset < blockLength;
> offset += STRIPE_SIZE) {
> // Read the data in first
> for(int i = 0; i < k; i++) {
> int x = packetIndexes[i];
> -
> readers[x].readFully(realBuffer, i * STRIPE_SIZE,
> - STRIPE_SIZE);
> + if(existingReaders.contains(new
> Integer(x))) {
> + DataInputStream dis =
> new
DataInputStream(buckets[i].getInputStream());
> +
> dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> + Closer.close(dis);
> + }
> }
> // Do the decode
> // Not shuffled
> @@ -178,19 +175,16 @@
> fec.decode(packets, disposableIndexes);
> // packets now contains an array of
> decoded blocks, in order
> // Write the data out
> - for(int i = 0; i < k; i++)
> - if(writers[i] != null)
> -
> writers[i].write(realBuffer, i * STRIPE_SIZE,
> - STRIPE_SIZE);
> + for(int i = 0; i < k; i++) {
> + if(existingWriters.contains(new
> Integer(i))) {
> + OutputStream dos =
> buckets[i].getOutputStream();
> + dos.write(realBuffer, i
> * STRIPE_SIZE, STRIPE_SIZE);
> + }
> + }
> }
> + }
>
> - }
> - finally {
> - for(int i = 0; i < k; i++)
> - Closer.close(writers[i]);
> - for(int i = 0; i < n; i++)
> - Closer.close(readers[i]);
> - }
> +
> // Set new buckets only after have a successful decode.
> // Note that the last data bucket will be overwritten padded.
> for(int i = 0; i < dataBlockStatus.length; i++) {
> @@ -222,11 +216,7 @@
> Buffer[] dataPackets = new Buffer[k];
> Buffer[] checkPackets = new Buffer[n - k];
> Bucket[] buckets = new Bucket[n];
> - DataInputStream[] readers = new DataInputStream[k];
> - OutputStream[] writers = new OutputStream[n - k];
>
> - try {
> -
> int[] toEncode = new int[n - k];
> int numberToEncode = 0; // can be less than n-k
>
> @@ -250,20 +240,18 @@
> else
> throw new
> IllegalArgumentException("Too big: " + sz + " bigger than "
+ blockLength);
> }
> - readers[i] = new
> DataInputStream(buckets[i].getInputStream());
> }
> -
> +
> + ArrayList existingWriters = new ArrayList();
> for(int i = 0; i < checkBlockStatus.length; i++) {
> buckets[i + k] = checkBlockStatus[i];
> if(buckets[i + k] == null) {
> buckets[i + k] =
> bf.makeBucket(blockLength);
> - writers[i] = buckets[i +
> k].getOutputStream();
> toEncode[numberToEncode++] = i + k;
> - }
> - else
> - writers[i] = null;
> + existingWriters.add(new Integer(i));
> + }
> }
> -
> +
> // Runtime.getRuntime().gc();
> // Runtime.getRuntime().runFinalization();
> // Runtime.getRuntime().gc();
> @@ -280,9 +268,11 @@
> if(logMINOR)
> Logger.minor(this, "Memory in
> use before read: " +
memUsedBeforeRead);
> // Read the data in first
> - for(int i = 0; i < k; i++)
> -
> readers[i].readFully(realBuffer, i * STRIPE_SIZE,
> - STRIPE_SIZE);
> + for(int i = 0; i < k; i++) {
> + DataInputStream dis = new
DataInputStream(buckets[i].getInputStream());
> + dis.readFully(realBuffer, i *
> STRIPE_SIZE, STRIPE_SIZE);
> + Closer.close(dis);
> + }
> // Do the encode
> // Not shuffled
> long startTime =
> System.currentTimeMillis();
> @@ -306,19 +296,18 @@
> Logger.minor(this, "Stripe
> encode took " + (endTime - startTime)
+ "ms for k=" + k + ", n=" + n + ", stripeSize=" + STRIPE_SIZE);
> // packets now contains an array of
> decoded blocks, in order
> // Write the data out
> - for(int i = k; i < n; i++)
> - if(writers[i - k] != null)
> - writers[i -
> k].write(realBuffer, i * STRIPE_SIZE,
> - STRIPE_SIZE);
> +
> + for(int i = k; i < n; i++) {
> + int index = i-k;
> +
> + if(existingWriters.contains(new
> Integer(index))) {
> + OutputStream os =
> buckets[i].getOutputStream();
> + os.write(realBuffer, i
> * STRIPE_SIZE, STRIPE_SIZE);
> + Closer.close(os);
> + }
> + }
> }
>
> - }
> - finally {
> - for(int i = 0; i < k; i++)
> - Closer.close(readers[i]);
> - for(int i = 0; i < n - k; i++)
> - Closer.close(writers[i]);
> - }
> // Set new buckets only after have a successful decode.
> for(int i = 0; i < checkBlockStatus.length; i++) {
> Bucket data = buckets[i + k];
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20080425/035de1ef/attachment.pgp>