Author: nextgens Date: 2007-04-14 01:01:06 +0000 (Sat, 14 Apr 2007) New Revision: 12675
Modified: trunk/freenet/src/freenet/client/FECCodec.java trunk/freenet/src/freenet/client/StandardOnionFECCodec.java Log: Move StandardOnionFecCodec.real(Encode|Decode) in FECCodec ... First step for #1291 Not tested. Modified: trunk/freenet/src/freenet/client/FECCodec.java =================================================================== --- trunk/freenet/src/freenet/client/FECCodec.java 2007-04-14 00:28:40 UTC (rev 12674) +++ trunk/freenet/src/freenet/client/FECCodec.java 2007-04-14 01:01:06 UTC (rev 12675) @@ -3,6 +3,18 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; + +import com.onionnetworks.fec.FECCode; +import com.onionnetworks.util.Buffer; + +import freenet.support.Logger; +import freenet.support.api.Bucket; +import freenet.support.api.BucketFactory; +import freenet.support.io.BucketTools; + /** * FEC (forward error correction) handler. * I didn't keep the old freenet.client.FEC* etc as it seemed grossly overengineered with @@ -11,7 +23,15 @@ * */ public abstract class FECCodec { + + static boolean logMINOR; + // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more JNI overhead + private static int STRIPE_SIZE = 4096; + FECCode fec; + + int k, n; + /** * Get a codec where we know both the number of data blocks and the number * of check blocks, and the codec type. Normally for decoding. @@ -46,4 +66,266 @@ /** Queue an asynchronous encode or decode job */ public abstract void addToQueue(FECJob job); + + protected void realDecode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws IOException { + if(logMINOR) + Logger.minor(this, "Doing decode: " + dataBlockStatus.length + + " data blocks, " + checkBlockStatus.length + + " check blocks, block length " + blockLength + " with " + + this, new Exception("debug")); + if (dataBlockStatus.length + checkBlockStatus.length != n) + throw new IllegalArgumentException(); + if (dataBlockStatus.length != k) + throw new IllegalArgumentException(); + Buffer[] packets = new Buffer[k]; + Bucket[] buckets = new Bucket[n]; + DataInputStream[] readers = new DataInputStream[n]; + OutputStream[] writers = new OutputStream[k]; + int numberToDecode = 0; // can be less than n-k + + try { + + byte[] realBuffer = new byte[k * STRIPE_SIZE]; + + int[] packetIndexes = new int[k]; + for (int i = 0; i < packetIndexes.length; i++) + packetIndexes[i] = -1; + + int idx = 0; + + for (int i = 0; i < k; i++) + packets[i] = new Buffer(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + + for (int i = 0; i < dataBlockStatus.length; i++) { + buckets[i] = dataBlockStatus[i].getData(); + if (buckets[i] == null) { + buckets[i] = bf.makeBucket(blockLength); + writers[i] = buckets[i].getOutputStream(); + if(logMINOR) Logger.minor(this, "writers[" + i + "] != null"); + readers[i] = null; + numberToDecode++; + } else { + long sz = buckets[i].size(); + if (sz < blockLength) { + if (i != dataBlockStatus.length - 1) + throw new IllegalArgumentException( + "All buckets except the last must be the full size but data bucket " + + i + " of " + + dataBlockStatus.length + " (" + + dataBlockStatus[i] + ") is " + sz + + " not " + blockLength); + if (sz < blockLength) + buckets[i] = BucketTools.pad(buckets[i], blockLength, bf,(int) sz); + else + throw new IllegalArgumentException("Too big: " + sz + + " bigger than " + blockLength); + } + if(logMINOR) Logger.minor(this, "writers[" + i + "] = null (already filled)"); + writers[i] = null; + readers[i] = new DataInputStream(buckets[i] + .getInputStream()); + packetIndexes[idx++] = i; + } + } + for (int i = 0; i < checkBlockStatus.length; i++) { + buckets[i + k] = checkBlockStatus[i].getData(); + if (buckets[i + k] == null) { + readers[i + k] = null; + } else { + readers[i + k] = new DataInputStream(buckets[i + k] + .getInputStream()); + if (idx < k) + packetIndexes[idx++] = i + k; + } + } + + if (idx < k) + throw new IllegalArgumentException( + "Must have at least k packets (k="+k+",idx="+idx+ ')'); + + if(logMINOR) for (int i = 0; i < packetIndexes.length; i++) + Logger.minor(this, "[" + i + "] = " + packetIndexes[i]); + + if (numberToDecode > 0) { + // Do the (striped) decode + for (int offset = 0; offset < blockLength; offset += STRIPE_SIZE) { + // Read the data in first + for (int i = 0; i < k; i++) { + int x = packetIndexes[i]; + readers[x].readFully(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + } + // Do the decode + // Not shuffled + int[] disposableIndexes = new int[packetIndexes.length]; + System.arraycopy(packetIndexes, 0, disposableIndexes, 0, + packetIndexes.length); + fec.decode(packets, disposableIndexes); + // packets now contains an array of decoded blocks, in order + // Write the data out + for (int i = 0; i < k; i++) { + if (writers[i] != null) + writers[i].write(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + } + } + } + + } finally { + + for (int i = 0; i < k; i++) { + if (writers[i] != null) + writers[i].close(); + } + for (int i = 0; i < n; i++) { + if (readers[i] != null) + readers[i].close(); + } + + } + // Set new buckets only after have a successful decode. + // Note that the last data bucket will be overwritten padded. + for (int i = 0; i < dataBlockStatus.length; i++) { + Bucket data = buckets[i]; + if (data.size() != blockLength) + throw new IllegalStateException("Block " + i + ": " + data + + " : " + dataBlockStatus[i] + " length " + data.size()); + dataBlockStatus[i].setData(data); + } + } + + /** + * Do the actual encode. + */ + protected void realEncode(Bucket[] dataBlockStatus, + Bucket[] checkBlockStatus, int blockLength, BucketFactory bf) + throws IOException { +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); + long memUsedAtStart = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(logMINOR) { + Logger.minor(this, "Memory in use at start: "+memUsedAtStart+" max="+Runtime.getRuntime().maxMemory()); + Logger.minor(this, "Doing encode: " + dataBlockStatus.length + + " data blocks, " + checkBlockStatus.length + + " check blocks, block length " + blockLength + " with " + + this); + } + if ((dataBlockStatus.length + checkBlockStatus.length != n) || + (dataBlockStatus.length != k)) + throw new IllegalArgumentException("Data blocks: "+dataBlockStatus.length+", Check blocks: "+checkBlockStatus.length+", n: "+n+", k: "+k); + Buffer[] dataPackets = new Buffer[k]; + Buffer[] checkPackets = new Buffer[n - k]; + Bucket[] buckets = new Bucket[n]; + DataInputStream[] readers = new DataInputStream[k]; + OutputStream[] writers = new OutputStream[n - k]; + + try { + + int[] toEncode = new int[n - k]; + int numberToEncode = 0; // can be less than n-k + + byte[] realBuffer = new byte[n * STRIPE_SIZE]; + + for (int i = 0; i < k; i++) + dataPackets[i] = new Buffer(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + for (int i = 0; i < n - k; i++) + checkPackets[i] = new Buffer(realBuffer, (i + k) * STRIPE_SIZE, + STRIPE_SIZE); + + for (int i = 0; i < dataBlockStatus.length; i++) { + buckets[i] = dataBlockStatus[i]; + long sz = buckets[i].size(); + if (sz < blockLength) { + if (i != dataBlockStatus.length - 1) + throw new IllegalArgumentException( + "All buckets except the last must be the full size"); + if (sz < blockLength) + buckets[i] = BucketTools.pad(buckets[i], blockLength, bf, (int) sz); + else + throw new IllegalArgumentException("Too big: " + sz + + " bigger than " + blockLength); + } + readers[i] = new DataInputStream(buckets[i].getInputStream()); + } + + for (int i = 0; i < checkBlockStatus.length; i++) { + buckets[i + k] = checkBlockStatus[i]; + if (buckets[i + k] == null) { + buckets[i + k] = bf.makeBucket(blockLength); + writers[i] = buckets[i + k].getOutputStream(); + toEncode[numberToEncode++] = i + k; + } else { + writers[i] = null; + } + } + +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); + long memUsedBeforeEncodes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(logMINOR) Logger.minor(this, "Memory in use before encodes: "+memUsedBeforeEncodes); + + if (numberToEncode > 0) { + // Do the (striped) encode + for (int offset = 0; offset < blockLength; offset += STRIPE_SIZE) { + long memUsedBeforeRead = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(logMINOR) Logger.minor(this, "Memory in use before read: "+memUsedBeforeRead); + // Read the data in first + for (int i = 0; i < k; i++) { + readers[i].readFully(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + } + // Do the encode + // Not shuffled + long startTime = System.currentTimeMillis(); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); + long memUsedBeforeStripe = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(logMINOR) Logger.minor(this, "Memory in use before stripe: "+memUsedBeforeStripe); + fec.encode(dataPackets, checkPackets, toEncode); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); +// Runtime.getRuntime().gc(); +// Runtime.getRuntime().runFinalization(); + long memUsedAfterStripe = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(logMINOR) Logger.minor(this, "Memory in use after stripe: "+memUsedAfterStripe); + long endTime = System.currentTimeMillis(); + if(logMINOR) Logger.minor(this, "Stripe encode took " + + (endTime - startTime) + "ms for k=" + k + ", n=" + + n + ", stripeSize=" + STRIPE_SIZE); + // packets now contains an array of decoded blocks, in order + // Write the data out + for (int i = k; i < n; i++) { + if (writers[i - k] != null) + writers[i - k].write(realBuffer, i * STRIPE_SIZE, + STRIPE_SIZE); + } + } + } + + } finally { + + for (int i = 0; i < k; i++) + if (readers[i] != null) + readers[i].close(); + for (int i = 0; i < n - k; i++) + if (writers[i] != null) + writers[i].close(); + + } + // Set new buckets only after have a successful decode. + for (int i = 0; i < checkBlockStatus.length; i++) { + Bucket data = buckets[i + k]; + if (data == null) + throw new NullPointerException(); + checkBlockStatus[i] = data; + } + } } Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java =================================================================== --- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-04-14 00:28:40 UTC (rev 12674) +++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-04-14 01:01:06 UTC (rev 12675) @@ -3,41 +3,27 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client; -import java.io.DataInputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.LinkedList; import java.util.NoSuchElementException; import com.onionnetworks.fec.FECCode; import com.onionnetworks.fec.Native8Code; import com.onionnetworks.fec.PureCode; -import com.onionnetworks.util.Buffer; import freenet.support.LRUHashtable; import freenet.support.Logger; -import freenet.support.api.Bucket; -import freenet.support.api.BucketFactory; -import freenet.support.io.BucketTools; /** * FECCodec implementation using the onion code. */ public class StandardOnionFECCodec extends FECCodec { - - private static boolean logMINOR; - // REDFLAG: How big is one of these? private static int MAX_CACHED_CODECS = 8; - // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more JNI overhead - private static int STRIPE_SIZE = 4096; static boolean noNative; private static final LRUHashtable recentlyUsedCodecs = new LRUHashtable(); - - private final FECCode fec; - private final int k, n; private static class MyKey { /** Number of input blocks */ @@ -110,269 +96,7 @@ logMINOR = Logger.shouldLog(Logger.MINOR, this); } - - private void realDecode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws IOException { - if(logMINOR) - Logger.minor(this, "Doing decode: " + dataBlockStatus.length - + " data blocks, " + checkBlockStatus.length - + " check blocks, block length " + blockLength + " with " - + this, new Exception("debug")); - if (dataBlockStatus.length + checkBlockStatus.length != n) - throw new IllegalArgumentException(); - if (dataBlockStatus.length != k) - throw new IllegalArgumentException(); - Buffer[] packets = new Buffer[k]; - Bucket[] buckets = new Bucket[n]; - DataInputStream[] readers = new DataInputStream[n]; - OutputStream[] writers = new OutputStream[k]; - int numberToDecode = 0; // can be less than n-k - - try { - - byte[] realBuffer = new byte[k * STRIPE_SIZE]; - - int[] packetIndexes = new int[k]; - for (int i = 0; i < packetIndexes.length; i++) - packetIndexes[i] = -1; - - int idx = 0; - - for (int i = 0; i < k; i++) - packets[i] = new Buffer(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - - for (int i = 0; i < dataBlockStatus.length; i++) { - buckets[i] = dataBlockStatus[i].getData(); - if (buckets[i] == null) { - buckets[i] = bf.makeBucket(blockLength); - writers[i] = buckets[i].getOutputStream(); - if(logMINOR) Logger.minor(this, "writers[" + i + "] != null"); - readers[i] = null; - numberToDecode++; - } else { - long sz = buckets[i].size(); - if (sz < blockLength) { - if (i != dataBlockStatus.length - 1) - throw new IllegalArgumentException( - "All buckets except the last must be the full size but data bucket " - + i + " of " - + dataBlockStatus.length + " (" - + dataBlockStatus[i] + ") is " + sz - + " not " + blockLength); - if (sz < blockLength) - buckets[i] = BucketTools.pad(buckets[i], blockLength, bf,(int) sz); - else - throw new IllegalArgumentException("Too big: " + sz - + " bigger than " + blockLength); - } - if(logMINOR) Logger.minor(this, "writers[" + i + "] = null (already filled)"); - writers[i] = null; - readers[i] = new DataInputStream(buckets[i] - .getInputStream()); - packetIndexes[idx++] = i; - } - } - for (int i = 0; i < checkBlockStatus.length; i++) { - buckets[i + k] = checkBlockStatus[i].getData(); - if (buckets[i + k] == null) { - readers[i + k] = null; - } else { - readers[i + k] = new DataInputStream(buckets[i + k] - .getInputStream()); - if (idx < k) - packetIndexes[idx++] = i + k; - } - } - - if (idx < k) - throw new IllegalArgumentException( - "Must have at least k packets (k="+k+",idx="+idx+ ')'); - - if(logMINOR) for (int i = 0; i < packetIndexes.length; i++) - Logger.minor(this, "[" + i + "] = " + packetIndexes[i]); - - if (numberToDecode > 0) { - // Do the (striped) decode - for (int offset = 0; offset < blockLength; offset += STRIPE_SIZE) { - // Read the data in first - for (int i = 0; i < k; i++) { - int x = packetIndexes[i]; - readers[x].readFully(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - } - // Do the decode - // Not shuffled - int[] disposableIndexes = new int[packetIndexes.length]; - System.arraycopy(packetIndexes, 0, disposableIndexes, 0, - packetIndexes.length); - fec.decode(packets, disposableIndexes); - // packets now contains an array of decoded blocks, in order - // Write the data out - for (int i = 0; i < k; i++) { - if (writers[i] != null) - writers[i].write(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - } - } - } - - } finally { - - for (int i = 0; i < k; i++) { - if (writers[i] != null) - writers[i].close(); - } - for (int i = 0; i < n; i++) { - if (readers[i] != null) - readers[i].close(); - } - - } - // Set new buckets only after have a successful decode. - // Note that the last data bucket will be overwritten padded. - for (int i = 0; i < dataBlockStatus.length; i++) { - Bucket data = buckets[i]; - if (data.size() != blockLength) - throw new IllegalStateException("Block " + i + ": " + data - + " : " + dataBlockStatus[i] + " length " + data.size()); - dataBlockStatus[i].setData(data); - } - } - - /** - * Do the actual encode. - */ - private void realEncode(Bucket[] dataBlockStatus, - Bucket[] checkBlockStatus, int blockLength, BucketFactory bf) - throws IOException { -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); - long memUsedAtStart = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - if(logMINOR) { - Logger.minor(this, "Memory in use at start: "+memUsedAtStart+" max="+Runtime.getRuntime().maxMemory()); - Logger.minor(this, "Doing encode: " + dataBlockStatus.length - + " data blocks, " + checkBlockStatus.length - + " check blocks, block length " + blockLength + " with " - + this); - } - if ((dataBlockStatus.length + checkBlockStatus.length != n) || - (dataBlockStatus.length != k)) - throw new IllegalArgumentException("Data blocks: "+dataBlockStatus.length+", Check blocks: "+checkBlockStatus.length+", n: "+n+", k: "+k); - Buffer[] dataPackets = new Buffer[k]; - Buffer[] checkPackets = new Buffer[n - k]; - Bucket[] buckets = new Bucket[n]; - DataInputStream[] readers = new DataInputStream[k]; - OutputStream[] writers = new OutputStream[n - k]; - - try { - - int[] toEncode = new int[n - k]; - int numberToEncode = 0; // can be less than n-k - - byte[] realBuffer = new byte[n * STRIPE_SIZE]; - - for (int i = 0; i < k; i++) - dataPackets[i] = new Buffer(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - for (int i = 0; i < n - k; i++) - checkPackets[i] = new Buffer(realBuffer, (i + k) * STRIPE_SIZE, - STRIPE_SIZE); - - for (int i = 0; i < dataBlockStatus.length; i++) { - buckets[i] = dataBlockStatus[i]; - long sz = buckets[i].size(); - if (sz < blockLength) { - if (i != dataBlockStatus.length - 1) - throw new IllegalArgumentException( - "All buckets except the last must be the full size"); - if (sz < blockLength) - buckets[i] = BucketTools.pad(buckets[i], blockLength, bf, (int) sz); - else - throw new IllegalArgumentException("Too big: " + sz - + " bigger than " + blockLength); - } - readers[i] = new DataInputStream(buckets[i].getInputStream()); - } - - for (int i = 0; i < checkBlockStatus.length; i++) { - buckets[i + k] = checkBlockStatus[i]; - if (buckets[i + k] == null) { - buckets[i + k] = bf.makeBucket(blockLength); - writers[i] = buckets[i + k].getOutputStream(); - toEncode[numberToEncode++] = i + k; - } else { - writers[i] = null; - } - } - -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); - long memUsedBeforeEncodes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - if(logMINOR) Logger.minor(this, "Memory in use before encodes: "+memUsedBeforeEncodes); - - if (numberToEncode > 0) { - // Do the (striped) encode - for (int offset = 0; offset < blockLength; offset += STRIPE_SIZE) { - long memUsedBeforeRead = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - if(logMINOR) Logger.minor(this, "Memory in use before read: "+memUsedBeforeRead); - // Read the data in first - for (int i = 0; i < k; i++) { - readers[i].readFully(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - } - // Do the encode - // Not shuffled - long startTime = System.currentTimeMillis(); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); - long memUsedBeforeStripe = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - if(logMINOR) Logger.minor(this, "Memory in use before stripe: "+memUsedBeforeStripe); - fec.encode(dataPackets, checkPackets, toEncode); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); -// Runtime.getRuntime().gc(); -// Runtime.getRuntime().runFinalization(); - long memUsedAfterStripe = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - if(logMINOR) Logger.minor(this, "Memory in use after stripe: "+memUsedAfterStripe); - long endTime = System.currentTimeMillis(); - if(logMINOR) Logger.minor(this, "Stripe encode took " - + (endTime - startTime) + "ms for k=" + k + ", n=" - + n + ", stripeSize=" + STRIPE_SIZE); - // packets now contains an array of decoded blocks, in order - // Write the data out - for (int i = k; i < n; i++) { - if (writers[i - k] != null) - writers[i - k].write(realBuffer, i * STRIPE_SIZE, - STRIPE_SIZE); - } - } - } - - } finally { - - for (int i = 0; i < k; i++) - if (readers[i] != null) - readers[i].close(); - for (int i = 0; i < n - k; i++) - if (writers[i] != null) - writers[i].close(); - - } - // Set new buckets only after have a successful decode. - for (int i = 0; i < checkBlockStatus.length; i++) { - Bucket data = buckets[i + k]; - if (data == null) - throw new NullPointerException(); - checkBlockStatus[i] = data; - } - } - + public int countCheckBlocks() { return n-k; } @@ -446,12 +170,10 @@ // Encode it try { - // FIXME refactor to eliminate casting and have a single FECRunner for all codecs - StandardOnionFECCodec codec = (StandardOnionFECCodec) job.codec; if(job.isADecodingJob) { - codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength, job.bucketFactory); + job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength, job.bucketFactory); } else { - codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory); + job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory); // Update SplitFileBlocks from buckets if necessary if((job.dataBlockStatus != null) || (job.checkBlockStatus != null)){ for(int i=0;i<job.dataBlocks.length;i++)
