Author: nextgens
Date: 2008-04-24 16:19:07 +0000 (Thu, 24 Apr 2008)
New Revision: 19544
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
Log:
Hopefully fix #2287: Freenet uses too many fd's when starting a 2G+ insert,
results in failure to persist
Now we leave it up to the GC ASAP
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2008-04-24 13:43:50 UTC
(rev 19543)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2008-04-24 16:19:07 UTC
(rev 19544)
@@ -7,7 +7,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
-import java.util.NoSuchElementException;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
@@ -20,6 +19,7 @@
import freenet.support.io.BucketTools;
import freenet.support.io.Closer;
import freenet.support.io.NativeThread;
+import java.util.ArrayList;
/**
* FEC (forward error correction) handler.
@@ -97,12 +97,10 @@
throw new IllegalArgumentException();
Buffer[] packets = new Buffer[k];
Bucket[] buckets = new Bucket[n];
- DataInputStream[] readers = new DataInputStream[n];
- OutputStream[] writers = new OutputStream[k];
int numberToDecode = 0; // can be less than n-k
+ ArrayList existingReaders = new ArrayList();
+ ArrayList existingWriters = new ArrayList();
- try {
-
byte[] realBuffer = new byte[k * STRIPE_SIZE];
int[] packetIndexes = new int[k];
@@ -114,15 +112,14 @@
for(int i = 0; i < k; i++)
packets[i] = new Buffer(realBuffer, i *
STRIPE_SIZE,
STRIPE_SIZE);
-
+
for(int i = 0; i < dataBlockStatus.length; i++) {
buckets[i] = dataBlockStatus[i].getData();
if(buckets[i] == null) {
buckets[i] = bf.makeBucket(blockLength);
- writers[i] =
buckets[i].getOutputStream();
+ existingWriters.add(new Integer(i));
if(logMINOR)
Logger.minor(this, "writers[" +
i + "] != null");
- readers[i] = null;
numberToDecode++;
}
else {
@@ -137,17 +134,14 @@
}
if(logMINOR)
Logger.minor(this, "writers[" +
i + "] = null (already filled)");
- writers[i] = null;
- readers[i] = new
DataInputStream(buckets[i].getInputStream());
+ existingReaders.add(new Integer(i));
packetIndexes[idx++] = i;
}
}
for(int i = 0; i < checkBlockStatus.length; i++) {
buckets[i + k] = checkBlockStatus[i].getData();
- if(buckets[i + k] == null)
- readers[i + k] = null;
- else {
- readers[i + k] = new
DataInputStream(buckets[i + k].getInputStream());
+ if(buckets[i + k] != null) {
+ existingReaders.add(new Integer(i+k));
if(idx < k)
packetIndexes[idx++] = i + k;
}
@@ -160,15 +154,18 @@
for(int i = 0; i < packetIndexes.length; i++)
Logger.minor(this, "[" + i + "] = " +
packetIndexes[i]);
- if(numberToDecode > 0)
+ if(numberToDecode > 0) {
// Do the (striped) decode
for(int offset = 0; offset < blockLength;
offset += STRIPE_SIZE) {
// Read the data in first
for(int i = 0; i < k; i++) {
int x = packetIndexes[i];
-
readers[x].readFully(realBuffer, i * STRIPE_SIZE,
- STRIPE_SIZE);
+ if(existingReaders.contains(new
Integer(x))) {
+ DataInputStream dis =
new DataInputStream(buckets[i].getInputStream());
+
dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
+ Closer.close(dis);
+ }
}
// Do the decode
// Not shuffled
@@ -178,19 +175,16 @@
fec.decode(packets, disposableIndexes);
// packets now contains an array of
decoded blocks, in order
// Write the data out
- for(int i = 0; i < k; i++)
- if(writers[i] != null)
-
writers[i].write(realBuffer, i * STRIPE_SIZE,
- STRIPE_SIZE);
+ for(int i = 0; i < k; i++) {
+ if(existingWriters.contains(new
Integer(i))) {
+ OutputStream dos =
buckets[i].getOutputStream();
+ dos.write(realBuffer, i
* STRIPE_SIZE, STRIPE_SIZE);
+ }
+ }
}
+ }
- }
- finally {
- for(int i = 0; i < k; i++)
- Closer.close(writers[i]);
- for(int i = 0; i < n; i++)
- Closer.close(readers[i]);
- }
+
// Set new buckets only after have a successful decode.
// Note that the last data bucket will be overwritten padded.
for(int i = 0; i < dataBlockStatus.length; i++) {
@@ -222,11 +216,7 @@
Buffer[] dataPackets = new Buffer[k];
Buffer[] checkPackets = new Buffer[n - k];
Bucket[] buckets = new Bucket[n];
- DataInputStream[] readers = new DataInputStream[k];
- OutputStream[] writers = new OutputStream[n - k];
- try {
-
int[] toEncode = new int[n - k];
int numberToEncode = 0; // can be less than n-k
@@ -250,20 +240,18 @@
else
throw new
IllegalArgumentException("Too big: " + sz + " bigger than " + blockLength);
}
- readers[i] = new
DataInputStream(buckets[i].getInputStream());
}
-
+
+ ArrayList existingWriters = new ArrayList();
for(int i = 0; i < checkBlockStatus.length; i++) {
buckets[i + k] = checkBlockStatus[i];
if(buckets[i + k] == null) {
buckets[i + k] =
bf.makeBucket(blockLength);
- writers[i] = buckets[i +
k].getOutputStream();
toEncode[numberToEncode++] = i + k;
- }
- else
- writers[i] = null;
+ existingWriters.add(new Integer(i));
+ }
}
-
+
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
// Runtime.getRuntime().gc();
@@ -280,9 +268,11 @@
if(logMINOR)
Logger.minor(this, "Memory in
use before read: " + memUsedBeforeRead);
// Read the data in first
- for(int i = 0; i < k; i++)
-
readers[i].readFully(realBuffer, i * STRIPE_SIZE,
- STRIPE_SIZE);
+ for(int i = 0; i < k; i++) {
+ DataInputStream dis = new
DataInputStream(buckets[i].getInputStream());
+ dis.readFully(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
+ Closer.close(dis);
+ }
// Do the encode
// Not shuffled
long startTime =
System.currentTimeMillis();
@@ -306,19 +296,18 @@
Logger.minor(this, "Stripe
encode took " + (endTime - startTime) + "ms for k=" + k + ", n=" + n + ",
stripeSize=" + STRIPE_SIZE);
// packets now contains an array of
decoded blocks, in order
// Write the data out
- for(int i = k; i < n; i++)
- if(writers[i - k] != null)
- writers[i -
k].write(realBuffer, i * STRIPE_SIZE,
- STRIPE_SIZE);
+
+ for(int i = k; i < n; i++) {
+ int index = i-k;
+
+ if(existingWriters.contains(new
Integer(index))) {
+ OutputStream os =
buckets[i].getOutputStream();
+ os.write(realBuffer, i
* STRIPE_SIZE, STRIPE_SIZE);
+ Closer.close(os);
+ }
+ }
}
- }
- finally {
- for(int i = 0; i < k; i++)
- Closer.close(readers[i]);
- for(int i = 0; i < n - k; i++)
- Closer.close(writers[i]);
- }
// Set new buckets only after have a successful decode.
for(int i = 0; i < checkBlockStatus.length; i++) {
Bucket data = buckets[i + k];