Author: nextgens
Date: 2008-04-25 08:52:15 +0000 (Fri, 25 Apr 2008)
New Revision: 19554
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
Log:
revert 19545 and 19544... As toad pointed out it can't work
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2008-04-25 08:30:47 UTC
(rev 19553)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2008-04-25 08:52:15 UTC
(rev 19554)
@@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
@@ -19,7 +20,6 @@
import freenet.support.io.BucketTools;
import freenet.support.io.Closer;
import freenet.support.io.NativeThread;
-import java.util.ArrayList;
/**
* FEC (forward error correction) handler.
@@ -31,10 +31,11 @@
public abstract class FECCodec {
// REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
+
private static int STRIPE_SIZE = 4096;
static boolean logMINOR;
FECCode fec;
- protected final int k, n;
+ protected final int k, n;
private final Executor executor;
protected FECCodec(Executor executor, int k, int n) {
@@ -42,7 +43,7 @@
this.k = k;
this.n = n;
}
-
+
/**
* Get a codec where we know both the number of data blocks and the
number
* of check blocks, and the codec type. Normally for decoding.
@@ -74,10 +75,11 @@
* all FEC.
*/
int checkBlocks = dataBlocks *
HighLevelSimpleClientImpl.SPLITFILE_CHECK_BLOCKS_PER_SEGMENT /
HighLevelSimpleClientImpl.SPLITFILE_BLOCKS_PER_SEGMENT;
- if(dataBlocks >=
HighLevelSimpleClientImpl.SPLITFILE_CHECK_BLOCKS_PER_SEGMENT)
+ if(dataBlocks >=
HighLevelSimpleClientImpl.SPLITFILE_CHECK_BLOCKS_PER_SEGMENT)
checkBlocks =
HighLevelSimpleClientImpl.SPLITFILE_CHECK_BLOCKS_PER_SEGMENT;
return StandardOnionFECCodec.getInstance(dataBlocks,
checkBlocks, executor);
- } else
+ }
+ else
return null;
}
@@ -95,90 +97,100 @@
throw new IllegalArgumentException();
Buffer[] packets = new Buffer[k];
Bucket[] buckets = new Bucket[n];
+ DataInputStream[] readers = new DataInputStream[n];
+ OutputStream[] writers = new OutputStream[k];
int numberToDecode = 0; // can be less than n-k
- ArrayList existingReaders = new ArrayList();
- ArrayList existingWriters = new ArrayList();
- byte[] realBuffer = new byte[k * STRIPE_SIZE];
+ try {
- int[] packetIndexes = new int[k];
- for(int i = 0; i < packetIndexes.length; i++)
- packetIndexes[i] = -1;
+ byte[] realBuffer = new byte[k * STRIPE_SIZE];
- int idx = 0;
+ int[] packetIndexes = new int[k];
+ for(int i = 0; i < packetIndexes.length; i++)
+ packetIndexes[i] = -1;
- for(int i = 0; i < k; i++)
- packets[i] = new Buffer(realBuffer, i * STRIPE_SIZE,
STRIPE_SIZE);
+ int idx = 0;
- for(int i = 0; i < dataBlockStatus.length; i++) {
- buckets[i] = dataBlockStatus[i].getData();
- if(buckets[i] == null) {
- buckets[i] = bf.makeBucket(blockLength);
- existingWriters.add(new Integer(i));
- if(logMINOR)
- Logger.minor(this, "writers[" + i + "]
!= null");
- numberToDecode++;
- } else {
- long sz = buckets[i].size();
- if(sz < blockLength) {
- if(i != dataBlockStatus.length - 1)
- throw new
IllegalArgumentException("All buckets except the last must be the full size but
data bucket " + i + " of " + dataBlockStatus.length + " (" + dataBlockStatus[i]
+ ") is " + sz + " not " + blockLength);
- if(sz < blockLength)
- buckets[i] =
BucketTools.pad(buckets[i], blockLength, bf, (int) sz);
- else
- throw new
IllegalArgumentException("Too big: " + sz + " bigger than " + blockLength);
+ for(int i = 0; i < k; i++)
+ packets[i] = new Buffer(realBuffer, i *
STRIPE_SIZE,
+ STRIPE_SIZE);
+
+ for(int i = 0; i < dataBlockStatus.length; i++) {
+ buckets[i] = dataBlockStatus[i].getData();
+ if(buckets[i] == null) {
+ buckets[i] = bf.makeBucket(blockLength);
+ writers[i] =
buckets[i].getOutputStream();
+ if(logMINOR)
+ Logger.minor(this, "writers[" +
i + "] != null");
+ readers[i] = null;
+ numberToDecode++;
}
- if(logMINOR)
- Logger.minor(this, "writers[" + i + "]
= null (already filled)");
- existingReaders.add(new Integer(i));
- packetIndexes[idx++] = i;
+ else {
+ long sz = buckets[i].size();
+ if(sz < blockLength) {
+ if(i != dataBlockStatus.length
- 1)
+ throw new
IllegalArgumentException("All buckets except the last must be the full size but
data bucket " + i + " of " + dataBlockStatus.length + " (" + dataBlockStatus[i]
+ ") is " + sz + " not " + blockLength);
+ if(sz < blockLength)
+ buckets[i] =
BucketTools.pad(buckets[i], blockLength, bf, (int) sz);
+ else
+ throw new
IllegalArgumentException("Too big: " + sz + " bigger than " + blockLength);
+ }
+ if(logMINOR)
+ Logger.minor(this, "writers[" +
i + "] = null (already filled)");
+ writers[i] = null;
+ readers[i] = new
DataInputStream(buckets[i].getInputStream());
+ packetIndexes[idx++] = i;
+ }
}
- }
- for(int i = 0; i < checkBlockStatus.length; i++) {
- buckets[i + k] = checkBlockStatus[i].getData();
- if(buckets[i + k] != null) {
- existingReaders.add(new Integer(i + k));
- if(idx < k)
- packetIndexes[idx++] = i + k;
+ for(int i = 0; i < checkBlockStatus.length; i++) {
+ buckets[i + k] = checkBlockStatus[i].getData();
+ if(buckets[i + k] == null)
+ readers[i + k] = null;
+ else {
+ readers[i + k] = new
DataInputStream(buckets[i + k].getInputStream());
+ if(idx < k)
+ packetIndexes[idx++] = i + k;
+ }
}
- }
- if(idx < k)
- throw new IllegalArgumentException("Must have at least
k packets (k=" + k + ",idx=" + idx + ')');
+ if(idx < k)
+ throw new IllegalArgumentException("Must have
at least k packets (k=" + k + ",idx=" + idx + ')');
- if(logMINOR)
- for(int i = 0; i < packetIndexes.length; i++)
- Logger.minor(this, "[" + i + "] = " +
packetIndexes[i]);
+ if(logMINOR)
+ for(int i = 0; i < packetIndexes.length; i++)
+ Logger.minor(this, "[" + i + "] = " +
packetIndexes[i]);
- if(numberToDecode > 0)
- // Do the (striped) decode
- for(int offset = 0; offset < blockLength; offset +=
STRIPE_SIZE) {
- // Read the data in first
- for(int i = 0; i < k; i++) {
- int x = packetIndexes[i];
- if(existingReaders.contains(new
Integer(x))) {
- DataInputStream dis = new
DataInputStream(buckets[i].getInputStream());
- dis.readFully(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
- Closer.close(dis);
+ if(numberToDecode > 0)
+ // Do the (striped) decode
+
+ for(int offset = 0; offset < blockLength;
offset += STRIPE_SIZE) {
+ // Read the data in first
+ for(int i = 0; i < k; i++) {
+ int x = packetIndexes[i];
+
readers[x].readFully(realBuffer, i * STRIPE_SIZE,
+ STRIPE_SIZE);
}
+ // Do the decode
+ // Not shuffled
+ int[] disposableIndexes = new
int[packetIndexes.length];
+ System.arraycopy(packetIndexes, 0,
disposableIndexes, 0,
+ packetIndexes.length);
+ fec.decode(packets, disposableIndexes);
+ // packets now contains an array of
decoded blocks, in order
+ // Write the data out
+ for(int i = 0; i < k; i++)
+ if(writers[i] != null)
+
writers[i].write(realBuffer, i * STRIPE_SIZE,
+ STRIPE_SIZE);
}
- // Do the decode
- // Not shuffled
- int[] disposableIndexes = new
int[packetIndexes.length];
- System.arraycopy(packetIndexes, 0,
disposableIndexes, 0,
- packetIndexes.length);
- fec.decode(packets, disposableIndexes);
- // packets now contains an array of decoded
blocks, in order
- // Write the data out
- for(int i = 0; i < k; i++) {
- if(existingWriters.contains(new
Integer(i))) {
- OutputStream dos =
buckets[i].getOutputStream();
- dos.write(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
- }
- }
- }
-
+ }
+ finally {
+ for(int i = 0; i < k; i++)
+ Closer.close(writers[i]);
+ for(int i = 0; i < n; i++)
+ Closer.close(readers[i]);
+ }
// Set new buckets only after have a successful decode.
// Note that the last data bucket will be overwritten padded.
for(int i = 0; i < dataBlockStatus.length; i++) {
@@ -210,95 +222,103 @@
Buffer[] dataPackets = new Buffer[k];
Buffer[] checkPackets = new Buffer[n - k];
Bucket[] buckets = new Bucket[n];
+ DataInputStream[] readers = new DataInputStream[k];
+ OutputStream[] writers = new OutputStream[n - k];
- int[] toEncode = new int[n - k];
- int numberToEncode = 0; // can be less than n-k
+ try {
- byte[] realBuffer = new byte[n * STRIPE_SIZE];
+ int[] toEncode = new int[n - k];
+ int numberToEncode = 0; // can be less than n-k
- for(int i = 0; i < k; i++)
- dataPackets[i] = new Buffer(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
- for(int i = 0; i < n - k; i++)
- checkPackets[i] = new Buffer(realBuffer, (i + k) *
STRIPE_SIZE, STRIPE_SIZE);
+ byte[] realBuffer = new byte[n * STRIPE_SIZE];
- for(int i = 0; i < dataBlockStatus.length; i++) {
- buckets[i] = dataBlockStatus[i];
- long sz = buckets[i].size();
- if(sz < blockLength) {
- if(i != dataBlockStatus.length - 1)
- throw new IllegalArgumentException("All
buckets except the last must be the full size");
- if(sz < blockLength)
- buckets[i] =
BucketTools.pad(buckets[i], blockLength, bf, (int) sz);
- else
- throw new IllegalArgumentException("Too
big: " + sz + " bigger than " + blockLength);
+ for(int i = 0; i < k; i++)
+ dataPackets[i] = new Buffer(realBuffer, i *
STRIPE_SIZE,
+ STRIPE_SIZE);
+ for(int i = 0; i < n - k; i++)
+ checkPackets[i] = new Buffer(realBuffer, (i +
k) * STRIPE_SIZE,
+ STRIPE_SIZE);
+
+ for(int i = 0; i < dataBlockStatus.length; i++) {
+ buckets[i] = dataBlockStatus[i];
+ long sz = buckets[i].size();
+ if(sz < blockLength) {
+ if(i != dataBlockStatus.length - 1)
+ throw new
IllegalArgumentException("All buckets except the last must be the full size");
+ if(sz < blockLength)
+ buckets[i] =
BucketTools.pad(buckets[i], blockLength, bf, (int) sz);
+ else
+ throw new
IllegalArgumentException("Too big: " + sz + " bigger than " + blockLength);
+ }
+ readers[i] = new
DataInputStream(buckets[i].getInputStream());
}
- }
- ArrayList existingWriters = new ArrayList();
- for(int i = 0; i < checkBlockStatus.length; i++) {
- buckets[i + k] = checkBlockStatus[i];
- if(buckets[i + k] == null) {
- buckets[i + k] = bf.makeBucket(blockLength);
- toEncode[numberToEncode++] = i + k;
- existingWriters.add(new Integer(i));
+ for(int i = 0; i < checkBlockStatus.length; i++) {
+ buckets[i + k] = checkBlockStatus[i];
+ if(buckets[i + k] == null) {
+ buckets[i + k] =
bf.makeBucket(blockLength);
+ writers[i] = buckets[i +
k].getOutputStream();
+ toEncode[numberToEncode++] = i + k;
+ }
+ else
+ writers[i] = null;
}
- }
- // Runtime.getRuntime().gc();
+ // Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
- long memUsedBeforeEncodes = Runtime.getRuntime().totalMemory()
- Runtime.getRuntime().freeMemory();
- if(logMINOR)
- Logger.minor(this, "Memory in use before encodes: " +
memUsedBeforeEncodes);
+ long memUsedBeforeEncodes =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ if(logMINOR)
+ Logger.minor(this, "Memory in use before
encodes: " + memUsedBeforeEncodes);
- if(numberToEncode > 0)
- // Do the (striped) encode
- for(int offset = 0; offset < blockLength; offset +=
STRIPE_SIZE) {
- long memUsedBeforeRead =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- if(logMINOR)
- Logger.minor(this, "Memory in use
before read: " + memUsedBeforeRead);
- // Read the data in first
- for(int i = 0; i < k; i++) {
- DataInputStream dis = new
DataInputStream(buckets[i].getInputStream());
- dis.readFully(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
- Closer.close(dis);
- }
- // Do the encode
- // Not shuffled
- long startTime = System.currentTimeMillis();
- //
Runtime.getRuntime().gc();
+ if(numberToEncode > 0)
+ // Do the (striped) encode
+
+ for(int offset = 0; offset < blockLength;
offset += STRIPE_SIZE) {
+ long memUsedBeforeRead =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ if(logMINOR)
+ Logger.minor(this, "Memory in
use before read: " + memUsedBeforeRead);
+ // Read the data in first
+ for(int i = 0; i < k; i++)
+
readers[i].readFully(realBuffer, i * STRIPE_SIZE,
+ STRIPE_SIZE);
+ // Do the encode
+ // Not shuffled
+ long startTime =
System.currentTimeMillis();
+ //
Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
- long memUsedBeforeStripe =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- if(logMINOR)
- Logger.minor(this, "Memory in use
before stripe: " + memUsedBeforeStripe);
- fec.encode(dataPackets, checkPackets, toEncode);
- //
Runtime.getRuntime().gc();
+ long memUsedBeforeStripe =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ if(logMINOR)
+ Logger.minor(this, "Memory in
use before stripe: " + memUsedBeforeStripe);
+ fec.encode(dataPackets, checkPackets,
toEncode);
+ //
Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
- long memUsedAfterStripe =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- if(logMINOR)
- Logger.minor(this, "Memory in use after
stripe: " + memUsedAfterStripe);
- long endTime = System.currentTimeMillis();
- if(logMINOR)
- Logger.minor(this, "Stripe encode took
" + (endTime - startTime) + "ms for k=" + k + ", n=" + n + ", stripeSize=" +
STRIPE_SIZE);
- // packets now contains an array of decoded
blocks, in order
- // Write the data out
-
- for(int i = k; i < n; i++) {
- int index = i - k;
-
- if(existingWriters.contains(new
Integer(index))) {
- OutputStream os =
buckets[i].getOutputStream();
- os.write(realBuffer, i *
STRIPE_SIZE, STRIPE_SIZE);
- Closer.close(os);
- }
+ long memUsedAfterStripe =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ if(logMINOR)
+ Logger.minor(this, "Memory in
use after stripe: " + memUsedAfterStripe);
+ long endTime =
System.currentTimeMillis();
+ if(logMINOR)
+ Logger.minor(this, "Stripe
encode took " + (endTime - startTime) + "ms for k=" + k + ", n=" + n + ",
stripeSize=" + STRIPE_SIZE);
+ // packets now contains an array of
decoded blocks, in order
+ // Write the data out
+ for(int i = k; i < n; i++)
+ if(writers[i - k] != null)
+ writers[i -
k].write(realBuffer, i * STRIPE_SIZE,
+ STRIPE_SIZE);
}
- }
+ }
+ finally {
+ for(int i = 0; i < k; i++)
+ Closer.close(readers[i]);
+ for(int i = 0; i < n - k; i++)
+ Closer.close(writers[i]);
+ }
// Set new buckets only after have a successful decode.
for(int i = 0; i < checkBlockStatus.length; i++) {
Bucket data = buckets[i + k];
@@ -324,7 +344,7 @@
synchronized(_awaitingJobs) {
_awaitingJobs.addFirst(job);
if(runningFECThreads < maxThreads) {
- executor.execute(fecRunner, "FEC Pool " +
fecPoolCounter++);
+ executor.execute(fecRunner, "FEC Pool
"+fecPoolCounter++);
runningFECThreads++;
}
_awaitingJobs.notifyAll();
@@ -336,35 +356,35 @@
private static final FECRunner fecRunner = new FECRunner();
private static int runningFECThreads;
private static int fecPoolCounter;
-
+
private synchronized static int getMaxRunningFECThreads() {
long now = System.currentTimeMillis();
- if(now - lastPolledMaxRunningFECThreads < 5 * 60 * 1000)
- return maxRunningFECThreads;
+ if(now - lastPolledMaxRunningFECThreads < 5*60*1000) return
maxRunningFECThreads;
String osName = System.getProperty("os.name");
- if(osName.indexOf("Windows") == -1 &&
(osName.toLowerCase().indexOf("mac os x") > 0) ||
(!NativeThread.usingNativeCode()))
+ if(osName.indexOf("Windows") == -1 &&
(osName.toLowerCase().indexOf("mac os x") > 0) ||
(!NativeThread.usingNativeCode())) {
// OS/X niceness is really weak, so we don't want any
more background CPU load than necessary
// Also, on non-Windows, we need the native threads
library to be working.
maxRunningFECThreads = 1;
- else {
+ } else {
// Most other OSs will have reasonable niceness, so go
by RAM.
Runtime r = Runtime.getRuntime();
int max = r.availableProcessors(); // FIXME this may
change in a VM, poll it
long maxMemory = r.maxMemory();
- if(maxMemory < 256 * 1024 * 1024)
+ if(maxMemory < 256*1024*1024) {
max = 1;
- else
+ } else {
// Measured 11MB decode 8MB encode on amd64.
// No more than 10% of available RAM, so 110MB
for each extra processor.
- max = Math.min(max, (int)
(Math.min(Integer.MAX_VALUE, maxMemory / (128 * 1024 * 1024))));
+ max = Math.min(max, (int)
(Math.min(Integer.MAX_VALUE, maxMemory / (128*1024*1024))));
+ }
maxRunningFECThreads = max;
}
- Logger.minor(FECCodec.class, "Maximum FEC threads: " +
maxRunningFECThreads);
+ Logger.minor(FECCodec.class, "Maximum FEC threads:
"+maxRunningFECThreads);
return maxRunningFECThreads;
}
+
private static int maxRunningFECThreads;
private static int lastPolledMaxRunningFECThreads = -1;
-
static {
getMaxRunningFECThreads();
}
@@ -383,46 +403,46 @@
try {
while(true) {
FECJob job = null;
- // Get a job
- synchronized(_awaitingJobs) {
- while(_awaitingJobs.isEmpty()) {
-
_awaitingJobs.wait(Integer.MAX_VALUE);
+ // Get a job
+ synchronized(_awaitingJobs) {
+ while
(_awaitingJobs.isEmpty())
+
_awaitingJobs.wait(Integer.MAX_VALUE);
+ job = (FECJob)
_awaitingJobs.removeLast();
}
- job = (FECJob)
_awaitingJobs.removeLast();
- }
- // Encode it
- try {
- if(job.isADecodingJob)
-
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus,
job.blockLength, job.bucketFactory);
- else {
-
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
- // Update
SplitFileBlocks from buckets if necessary
- if((job.dataBlockStatus
!= null) || (job.checkBlockStatus != null)) {
- for(int i = 0;
i < job.dataBlocks.length; i++)
-
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
- for(int i = 0;
i < job.checkBlocks.length; i++)
-
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
+ // Encode it
+ try {
+ if(job.isADecodingJob)
+
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus,
job.blockLength, job.bucketFactory);
+ else {
+
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
+ // Update
SplitFileBlocks from buckets if necessary
+
if((job.dataBlockStatus != null) || (job.checkBlockStatus != null)) {
+ for(int
i = 0; i < job.dataBlocks.length; i++)
+
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
+ for(int
i = 0; i < job.checkBlocks.length; i++)
+
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
+ }
}
+ } catch(IOException e) {
+ Logger.error(this,
"BOH! ioe:" + e.getMessage());
}
- } catch(IOException e) {
- Logger.error(this, "BOH! ioe:"
+ e.getMessage());
- }
- // Call the callback
- try {
- if(job.isADecodingJob)
-
job.callback.onDecodedSegment();
- else
-
job.callback.onEncodedSegment();
+ // Call the callback
+ try {
+ if(job.isADecodingJob)
+
job.callback.onDecodedSegment();
+ else
+
job.callback.onEncodedSegment();
- } catch(Throwable e) {
- Logger.error(this, "The
callback failed!" + e.getMessage(), e);
- }
+ } catch(Throwable e) {
+ Logger.error(this, "The
callback failed!" + e.getMessage(), e);
+ }
}
- } catch(Throwable t) {
- Logger.error(this, "Caught " + t + " in " +
this, t);
- } finally {
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" in "+this, t);
+ }
+ finally {
synchronized(FECCodec.class) {
runningFECThreads--;
}