Author: toad
Date: 2006-09-26 23:18:01 +0000 (Tue, 26 Sep 2006)
New Revision: 10514
Added:
trunk/contrib/fec_src/com/onionnetworks/fec/io/
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
Log:
Added:
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
===================================================================
---
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
2006-09-26 20:10:33 UTC (rev 10513)
+++
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,59 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to write a packet to a
+ * block that has already been decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class BlockAlreadyDecodedException extends IOException {
+
+ int blockNum, stripeNum;
+
+ /**
+ * Constructs a new Exception
+ *
+ * @param blockNum The blockNum of the attempted packet
+ * @param stripeNum The stripeNum of the attempted packet
+ */
+ public BlockAlreadyDecodedException(int blockNum, int stripeNum) {
+ super();
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ }
+
+ /**
+ * Constructs a new Exception
+ *
+ * @param blockNum The blockNum of the attempted packet
+ * @param stripeNum The stripeNum of the attempted packet
+ * @param msg An informational message to include
+ */
+ public BlockAlreadyDecodedException(String msg, int blockNum,
+ int stripeNum) {
+ super(msg);
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ }
+
+ /**
+ * @return The blockNum of the attempted packet.
+ */
+ public int getBlockNum() {
+ return blockNum;
+ }
+
+ /**
+ * @return the stripeNum of the attempted packet.
+ */
+ public int getStripeNum() {
+ return stripeNum;
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,27 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that a complete block was decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class BlockDecodedEvent extends FECIOEvent {
+
+ int blockNum;
+
+ public BlockDecodedEvent(Object source, int blockNum) {
+ super(source);
+ this.blockNum = blockNum;
+ }
+
+ /**
+ * @return the blockNum of the block that was decoded.
+ */
+ public int getBlockNum() {
+ return blockNum;
+ }
+}
+
Added:
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
===================================================================
---
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
2006-09-26 20:10:33 UTC (rev 10513)
+++
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,56 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signifies that there was attempt to write a duplicate packet
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class DuplicatePacketException extends IOException {
+
+ int blockNum, stripeNum, packetIndex;
+
+ public DuplicatePacketException(int blockNum, int stripeNum,
+ int packetIndex) {
+ super();
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ this.packetIndex = packetIndex;
+ }
+
+ public DuplicatePacketException(String msg, int blockNum, int stripeNum,
+ int packetIndex) {
+ super(msg);
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ this.packetIndex = packetIndex;
+ }
+
+ /**
+ * @return the blockNum of the attempted packet
+ */
+ public int getBlockNum() {
+ return blockNum;
+ }
+
+ /**
+ * @return the stripeNum of the attempted packet
+ */
+ public int getStripeNum() {
+ return stripeNum;
+ }
+
+ /**
+ * @return the packetIndex of the existing copy of the packet. -1 if the
+ * packetIndex is unknown or there was an attempt to write a padding packet
+ */
+ public int getPacketIndex() {
+ return packetIndex;
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java 2006-09-26
20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java 2006-09-26
23:18:01 UTC (rev 10514)
@@ -0,0 +1,805 @@
+package com.onionnetworks.fec.io;
+
+import java.io.*;
+import java.util.*;
+import java.security.*;
+
+import org.apache.log4j.Category;
+
+import com.onionnetworks.fec.FECCode;
+import com.onionnetworks.fec.FECCodeFactory;
+import com.onionnetworks.io.RAF;
+import com.onionnetworks.util.*;
+import EDU.oswego.cs.dl.util.concurrent.*;
+
+/**
+ * This class provides the necessary file IO routines to go along with the raw
+ * FEC codes. It is completely thread safe for multiple readers and writers,
+ * and will automatically encode and decode the packets ass necessary. If
+ * the FECFile is originally opened in "rw" mode it will revert to "r" mode
+ * once it has recieved enough data and decoded the entire file.
+ *
+ * File Encoding Example:
+ * <code>
+ * File f = new File(args[0]);
+ *
+ * int k = 32; // source packet per block
+ * int n = 256; // number of packets to expand each block to.
+ * int packetSize = 1024; // number of bytes in each packet.
+ *
+ * FECParameters params = new FECParameters(k,n,packetSize,f.length());
+ *
+ * FECFile fecF = new FECFile(f,"r",params);
+ *
+ * // Read the packet with blockNum=0,stripeNum=32 into the Buffer
+ * // The read() interface is much faster if you encode multiple packets
+ * // per block and encourages you to do so. We are not going to for
+ * // simplicity.
+ * Buffer b = new Buffer(packetSize);
+ * fecF.read(new Buffer[] {b}, 0, new int[] {32});
+ * </code>
+ *
+ * Please see tests/FECFileTest for a further example.
+ *
+ * <lu>
+ * Thread/synchronization behavior:
+ *
+ * The FECFile tries to maintain as little contention as possible, but seeing
+ * as it involves both IO intensive and processor intensive functions, this is
+ * rather difficult.
+ *
+ * <li> Disk IO is fully synchronized. This doesn't really make any difference
+ * because you can't do two parallel IO operations on the same file anyway.
+ *
+ * <li> The highest level of synchronization is per-block reader/writer locks.
+ * This means that any number of threads may read/encode at parallel and that
+ * a write or decode operation blocks all other threads trying to access the
+ * same block. Since encoding is a CPU intensive task, there will be minimal
+ * gains from parallel encoding, but the opportunity for encoding and disk IO
+ * to occur in parallel may afford some savings.
+ * </lu>
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FECFile {
+
+ static Category cat = Category.getInstance(FECFile.class.getName());
+
+ private FECParameters params;
+
+ private RAF raf; // synched RandomAccessFile
+ private PacketPlacement pp;
+ private FileIntegrity integrity;
+ private MessageDigest md;
+
+ private FECCode code;
+
+ private ReflectiveEventDispatch dispatch;
+
+ private int k,n,packetSize,blockSize,blockCount;
+
+ private ReadWriteLock[] locks;
+
+ private Decoder decoder;
+
+ protected ExceptionHandler exceptionHandler;
+
+ /**
+ * Open the file according to <code>mode</code (either "r" or "rw") and
+ * encoding/decoding according to the FECParameters. This constructor
+ * will simply set the destination file to be the same as f if it is opened
+ * in "rw" mode.
+ *
+ * @param f The File to which the data will be read/written
+ * @param mode Either "r" or "rw"
+ * @param params The FECParameters that specify how the data will be
+ * encoded and decoded.
+ */
+ public FECFile(File f, String mode, FECParameters params)
+ throws IOException {
+ this(f,mode,params,null);
+ }
+
+ /**
+ * Open the file according to <code>mode</code (either "r" or "rw") and
+ * encoding/decoding according to the FECParameters. This constructor
+ * will simply set the destination file to be the same as f if it is opened
+ * in "rw" mode.
+ *
+ * @param f The File to which the data will be read/written
+ * @param mode Either "r" or "rw"
+ * @param params The FECParameters that specify how the data will be
+ * encoded and decoded.
+ * @param integrity Used for checking the file integrity.
+ */
+ public FECFile(File f, String mode, FECParameters params,
+ FileIntegrity integrity) throws IOException {
+ this.params = params;
+ this.integrity = integrity;
+ this.k = params.getK();
+ this.n = params.getN();
+ this.packetSize = params.getPacketSize();
+ this.blockSize = params.getUnexpandedBlockSize();
+ this.blockCount = params.getBlockCount();
+
+ this.code = FECCodeFactory.getDefault().createFECCode(k,n);
+ this.raf = new RAF(f,mode); // synched RandomAccessFile
+
+ // Create the locks.
+ locks = new ReadWriteLock[blockCount];
+ for (int i=0;i<locks.length;i++) {
+ locks[i] = new ReentrantWriterPreferenceReadWriteLock();
+ }
+
+ // add the default exception handler.
+ setDecodeExceptionHandler(new ExceptionHandler() {
+ public void handleException(ExceptionEvent ev) {
+ cat.warn("Set an ExceptionHandler via "+
+ "FECFile.setDecodeExceptionHandler()",
+ ev.getException());
+ }
+ });
+
+ if (mode.equals("rw")) {
+ // create the event dispatcher (only used in rw mode)
+ dispatch = new ReflectiveEventDispatch();
+
+ if (integrity == null) {
+ throw new IllegalArgumentException("Integrity requried in rw");
+ }
+
+ // set up the message digest
+ try {
+ md = MessageDigest.getInstance(integrity.getAlgorithm());
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+
+ // create the packet placement. Once it is created it cannot
+ // be deleted to guarentee the thread-safety of some of the
+ // utility methods in this class. Also note that the pp
+ // must never be set outside of the constructor.
+ this.pp = new PacketPlacement(params);
+
+ // set up and start the decoder.
+ decoder = new Decoder();
+ addFECIOListener(decoder);
+ new Thread(decoder,"Decoder Thread").start();
+ } else {
+ if (integrity != null) {
+ throw new IllegalArgumentException
+ ("Integrity not used in read-only mode");
+ }
+ }
+ }
+
+ /**
+ * Sets the ExceptionHandler for dealing with problems that occur during
+ * decoding.
+ *
+ * @param ExceptionHandler This object that will handle the exception.
+ */
+ public synchronized void setDecodeExceptionHandler(ExceptionHandler eh) {
+ this.exceptionHandler = eh;
+ }
+
+ protected synchronized ExceptionHandler getDecodeExceptionHandler() {
+ return exceptionHandler;
+ }
+
+ // We can't have a global buffer so I am praying that the GC can
+ // quickly reclaim these buffers and reallocate them. If the GC doesn't
+ // work out for this then we'll have to create our own BufferPool.
+ protected static final byte[] createBuffer(int len) {
+ return new byte[len];
+ }
+
+ protected final byte[] createBuffer() {
+ return createBuffer(blockSize);
+ }
+
+ protected final Buffer[] wrapBuffer(byte[] b) {
+ if (b.length != blockSize) {
+ throw new IllegalArgumentException("b.length != "+blockSize);
+ }
+ // Wrap up the byte[] with some Buffers
+ Buffer[] bufs = new Buffer[k];
+ for (int i=0;i<bufs.length;i++) {
+ bufs[i] = new Buffer(b,i*packetSize,packetSize);
+ }
+ return bufs;
+ }
+
+ /**
+ * When the File is opened in r/w mode you must specify a destination.
+ * This method allows you to specify the destination lazily to allow
+ * downloads to take place before the user has even chosen the final
+ * location with their file picker. Be ware that the final write to
+ * that would cause the file to be moved and re-opened will block until
+ * the destination file is set, so make sure to set it asap.
+ */
+ public void renameTo(File destFile) throws IOException {
+ raf.renameTo(destFile);
+ }
+
+ /**
+ * This method reads a number of packets (encoding them if necessary) into
+ * the provided buffers. The method accepts an array of stripeNums and
+ * an array of Buffers because it is vastly more efficient to encode
+ * multiple blocks all at once rather than across multiple method calls.
+ *
+ * Calls to read should be safe even during the rw->r switch because
+ * a packet that existed will never become unavailable.
+ *
+ * @param pkts The array of Buffers into which the data will be stored
+ * each Buffer must be <code>params.getPacketSize()</code> in length.
+ *
+ * @param blockNum The block num of the packets to encode.
+ * @param stripeNums The stripe nums of the packets to encode.
+ *
+ * @throws IOException If there is an IOException in the underlying file
+ * IO.
+ * @throws PacketNotFoundException If the desired packet can not be found
+ * in the PacketPlacement.
+ */
+ public void read(Buffer[] pkts, int blockNum, int[] stripeNums)
+ throws IOException, PacketNotFoundException {
+
+ if (blockNum < 0 || blockNum >= blockCount) {
+ throw new IllegalArgumentException
+ ("Illegal block# : blockNum="+blockNum+
+ ",stripeNum="+stripeNums[0]);
+ }
+
+ int ubs = -1;
+ byte[] b = null;
+
+ try {
+ locks[blockNum].readLock().acquire();
+ try {
+
+ // This raf check then pp access is safe because all
+ // locks must be aquired to change the raf's mode.
+ if (raf.getMode().equals("r") || pp.isBlockDecoded(blockNum)) {
+ // FIX if all of the desired packets are on disk then
+ // we shouldn't read in the whole block.
+ // read inside the lock and encode outside.
+ ubs = params.getUnexpandedBlockSize(blockNum);
+ b = createBuffer();
+ raf.seekAndReadFully(blockNum*blockSize,b,0,ubs);
+
+ } else {
+ // read the packets straight from disk.
+ for (int i=0;i<stripeNums.length;i++) {
+ if (params.isPaddingPacket(blockNum,stripeNums[i])) {
+ // deal with padding packet.
+ Util.bzero(pkts[i].b,pkts[i].off,pkts[i].len);
+ continue;
+ }
+ int packetIndex = pp.getPacketIndex
+ (blockNum,stripeNums[i]);
+ if (packetIndex == -1) {
+ throw new PacketNotFoundException
+ ("Packet not on disk: blockNum="+blockNum+
+ ",stripeNum="+stripeNums[i],blockNum,
+ stripeNums[i]);
+ }
+
+ // we read in whole packets because they are already
+ // zero padded if necessary.
+ raf.seekAndReadFully(packetIndex*packetSize,pkts[i].b,
+ pkts[i].off,pkts[i].len);
+ }
+ return;
+ }
+ } finally {
+ locks[blockNum].readLock().release();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+
+ // encode outside of the locks.
+ // Keep the gaps as zero's for encoding.
+ Util.bzero(b,ubs,b.length-ubs);
+ code.encode(wrapBuffer(b),pkts,stripeNums);
+ }
+
+
+ /**
+ * Writes a packet to disk. If the packet is the k'th packet that
+ * has been written for this block, then the block will be decoded in
+ * this call. Since decoding is a relatively costly process it may
+ * cause your program to experience some bursty behavior in how long
+ * the write calls take. None-the-less if you are making any timing
+ * judgements based off of IO then you are an idiot.
+ *
+ * If this packet is the last packet that needs to be written to disk
+ * in order to decode the whole file, the block will be decoded and
+ * then the file will be properly truncated, closed, moved to the
+ * destination file and re-opened in read-only mode. If the destFile
+ * has not been set yet then this will block until that is set.
+ *
+ * @param pkt The Buffer from which the data will be written.
+ * @param blockNum The blockNum of the packet to be written.
+ * @param stripeNum The stripeNum of the packet to be written.
+ *
+ * @throws IOException When there is a disk IOException.
+ * @throws DuplicatePacketException When there is an attempt to write a
+ * packet a second time.
+ * @throws BlockAlreadyDecodedException When the block is already decoded.
+ * @throws FileAlreadyDecodedException When you are trying to write a
+ * packet when the file has been decoded and switched to read-only
+ *
+ * @return The block packet count after writing this packet. This
+ * gives you the order the packets were written in for this block.
+ */
+ public int write(Buffer pkt, int blockNum, int stripeNum)
+ throws IOException, FileAlreadyDecodedException {
+
+ int result = -1;
+ try {
+ locks[blockNum].writeLock().acquire();
+ try {
+ result = write0(pkt,blockNum,stripeNum);
+ } finally {
+ locks[blockNum].writeLock().release();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+
+ // Hopefully this will give the decoder thread a chance to catch up.
+ if (result >= k) {
+ Thread.yield();
+ }
+ return result;
+ }
+
+ /**
+ * locks[blockNum].writeLock() acquired.
+ */
+ private int write0(Buffer pkt, int blockNum, int stripeNum)
+ throws IOException, DuplicatePacketException,
+ BlockAlreadyDecodedException, FileAlreadyDecodedException {
+
+ //cat.debug("Writing blockNum="+blockNum+",stripeNum="+stripeNum);
+
+ // read-only
+ if (raf.getMode().equals("r")) {
+ throw new FileAlreadyDecodedException
+ ("Attempted to write packet in read-only mode");
+ }
+
+ // padding packet
+ if (params.isPaddingPacket(blockNum,stripeNum)) {
+ cat.warn
+ ("You have attempted to write a padding packet which is "+
+ "is already generated by this FECFile and shouldn't be "+
+ "sent across the network. Talk to Justin for more info. "+
+ "blockNum="+blockNum+",stripeNum="+stripeNum);
+ throw new DuplicatePacketException
+ ("Attempted padding packet write. blockNum="+blockNum+
+ ",stripeNum="+stripeNum,blockNum,stripeNum,-1);
+ }
+
+ // block already decoded
+ if (pp.isBlockDecoded(blockNum)) {
+ throw new BlockAlreadyDecodedException
+ ("Block already decoded : blockNum="+blockNum+",stripeNum="+
+ stripeNum,blockNum,stripeNum);
+ }
+
+ // duplicate packet
+ if (pp.getPacketIndex(blockNum,stripeNum) != -1) {
+ int i = pp.getPacketIndex(blockNum,stripeNum);
+ throw new DuplicatePacketException
+ ("Duplicate packet: blockNum="+blockNum+"stripeNum="+stripeNum+
+ "index="+i, blockNum,stripeNum,i);
+ }
+
+ int packetIndex = -1;
+ int packetCount = -1;
+ // sync around the calls so that the packetCount is correct.
+ synchronized (pp) {
+ packetIndex = pp.addPacketEntry(blockNum,stripeNum);
+ packetCount = pp.getPacketCount(blockNum);
+ }
+
+ raf.seekAndWrite(packetIndex*packetSize,
+ pkt.b,pkt.off,pkt.len);
+
+ fire(new PacketWrittenEvent(this,blockNum,stripeNum,packetCount));
+ return packetCount;
+ }
+
+ /**
+ * Try to decode the specified block. This method will read in the block,
+ * decode it, and write the decoded block back to disk.
+ * @return true if the block was successfully decoded and verified.
+ */
+ protected boolean tryDecode(int blockNum, int[] stripeNums)
+ throws IOException {
+ cat.debug("trying to decode block : blockNum="+blockNum);
+
+ byte[] b = createBuffer();
+ Buffer[] bufs = wrapBuffer(b);
+ // read the packets from disk.
+ read(bufs,blockNum,stripeNums);
+
+ // decode the fucker.
+ code.decode(bufs,stripeNums);
+
+ // check the integrity
+ int ubs = params.getUnexpandedBlockSize(blockNum);
+ synchronized (md) {
+ md.update(b,0,ubs);
+ Buffer hash = integrity.getBlockHash(blockNum);
+ if (!Util.arraysEqual(md.digest(),0,hash.b,hash.off,hash.len)) {
+ return false;
+ }
+ }
+
+ try {
+ locks[blockNum].writeLock().acquire();
+ try {
+ // seek and write the decoded block.
+ raf.seekAndWrite(blockNum*blockSize,b,0,b.length);
+ // Update the placement to show decoded entries.
+ pp.setBlockDecoded(blockNum);
+ } finally {
+ locks[blockNum].writeLock().release();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+
+ fire(new BlockDecodedEvent(this,blockNum));
+ return true;
+ }
+
+ /**
+ * Acquires all write block locks. This method ABSOLUTELY CANNOT be called
+ * if a blockLock has already been acquired by this thread because
+ * of the possibility for deadly embrace.
+ */
+ public void acquireAllWriteLocks() throws InterruptedException {
+ for (int i=0;i<locks.length;i++) {
+ locks[i].writeLock().acquire();
+ }
+ }
+
+ /**
+ * Releases all write block locks. This method ABSOLUTELY CANNOT be called
+ * if a blockLock has already been acquired by this thread because
+ * of the possibility for deadly embrace.
+ */
+ public void releaseAllWriteLocks() throws InterruptedException {
+ for (int i=0;i<locks.length;i++) {
+ locks[i].writeLock().release();
+ }
+ }
+
+ /**
+ * Close the underlying file descriptor and free up the resources
+ * associated with this FECFile.
+ */
+ public void close() throws IOException {
+ try {
+ acquireAllWriteLocks();
+ try {
+ raf.close();
+ } finally {
+ releaseAllWriteLocks();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+
+ if (decoder != null) {
+ // clean up the decoder.
+ decoder.close();
+ }
+
+ if (dispatch != null) {
+ // Free up the dispatch thread.
+ dispatch.close();
+ }
+ }
+
+ /**
+ * Adds a new FECIOListener.
+ *
+ */
+ public void addFECIOListener(FECIOListener fil) {
+ if (dispatch == null) {
+ throw new IllegalStateException("No events in read-only mode");
+ }
+ dispatch.addListener(this,fil, FECIOListener.EVENTS);
+ }
+
+ public void removeFECIOListener(FECIOListener fil) {
+ if (dispatch == null) {
+ throw new IllegalStateException("No events in read-only mode");
+ }
+ dispatch.removeListener(this,fil, FECIOListener.EVENTS);
+ }
+
+ /**
+ * Fire an event.
+ */
+ protected void fire(FECIOEvent ev) {
+ dispatch.fire(ev,"notify");
+ }
+
+ /**
+ * @return The decoded block count
+ */
+ public int getDecodedBlockCount() {
+ // This should be safe to access w/o the locks because it is a
+ // read-only operation and the PacketPlacement is completely
+ // synchronized. Also note that the pp == null check should be
+ // safe because pp can only be set in the constructor and once
+ // it has been created it will never be deleted.
+ return pp == null ? blockCount : pp.getDecodedBlockCount();
+ }
+
+ /**
+ * @return true if the block is decoded.
+ */
+ public boolean isBlockDecoded(int blockNum) {
+ // This should be safe to access w/o the locks because it is a
+ // read-only operation and the PacketPlacement is completely
+ // synchronized. Also note that the pp == null check should be
+ // safe because pp can only be set in the constructor and once
+ // it has been created it will never be deleted.
+ return pp == null ? true : pp.isBlockDecoded(blockNum);
+ }
+
+ /**
+ * @return true if the FECFile contains the specified packet.
+ */
+ public boolean containsPacket(int blockNum, int stripeNum) {
+ // This should be safe to access w/o the locks because it is a
+ // read-only operation and the PacketPlacement is completely
+ // synchronized. Also note that the pp == null check should be
+ // safe because pp can only be set in the constructor and once
+ // it has been created it will never be deleted.
+ if (pp == null) {
+ return true;
+ }
+ synchronized (pp) {
+ if (pp.isBlockDecoded(blockNum)) {
+ return true;
+ } else {
+ return pp.getPacketIndex(blockNum,stripeNum) != -1;
+ }
+ }
+ }
+
+ /**
+ * @return The number of packets written to disk.
+ */
+ public int getWrittenCount() {
+ // This should be safe to access w/o the locks because it is a
+ // read-only operation and the PacketPlacement is completely
+ // synchronized. Also note that the pp == null check should be
+ // safe because pp can only be set in the constructor and once
+ // it has been created it will never be deleted.
+ return pp == null ? params.getUnexpandedPacketCount() :
+ pp.getWrittenCount();
+ }
+
+ /**
+ * @return The FECParameters used for encoding/decoding.
+ */
+ public FECParameters getFECParameters() {
+ return params;
+ }
+
+ /**
+ * @return true if this file is decoded.
+ */
+ public boolean isDecoded() {
+ // no locking required, raf is already synchronized.
+ return raf.getMode().equals("r");
+ }
+
+ /**
+ * This method blocks and returns once the file has been decoded. This
+ * is primarily so that you know when it is safe to close the FECFile
+ * without having to setup your own FECFileListeners to wait for the
+ * FileDecodedEvent.
+ */
+ public void waitForFileDecoded() throws InterruptedException {
+ FECIOListener fil = new FECIOListener() {
+ public void notify(FECIOEvent ev) {
+ if (ev instanceof FileDecodedEvent) {
+ synchronized (this) {
+ this.notify();
+ }
+ }
+ }
+ };
+
+ // synch around the fil to make sure it isn't notified before we
+ // start waiting on it.
+ synchronized (fil) {
+ addFECIOListener(fil);
+ // isDecoded() is always set to true before the event is thrown.
+ if (!isDecoded()) {
+ fil.wait();
+ }
+ removeFECIOListener(fil);
+ }
+ }
+
+ public class Decoder implements Runnable, FECIOListener {
+
+ LinkedList queue = new LinkedList();
+ boolean done = false;
+
+ public void notify(FECIOEvent ev) {
+ if (ev instanceof PacketWrittenEvent) {
+ synchronized (queue) {
+ queue.add(ev);
+ queue.notify();
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (queue) {
+ // close already called.
+ if (done) {
+ return;
+ }
+ removeFECIOListener(this);
+ done = true;
+ queue.clear();
+ queue.notify();
+ }
+ }
+
+ /**
+ * Generates the next combo for decoding.
+ * @param combo (in/out) Modified to provide the next combo.
+ * @param reserved The number of indexes guarenteed to be in every
+ * combo. These are the last indexes in the combo array.
+ * @param n The number of elements in the set to choose the combo from.
+ *
+ * @return false if there are no more combos to generate.
+ */
+ private final boolean nextCombo(int[] combo, int n) {
+ // loop through the list from back to front generating combos
+ for (int i=combo.length-1;i>=0;i--) {
+ // search for the next.
+ if (combo[i] != (n-combo.length)+i) {
+ combo[i]++;
+ for (int j=i+1;j<combo.length;j++) {
+ combo[j] = combo[j-1]+1;
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void run() {
+ PacketWrittenEvent ev = null;
+
+ while (true) {
+ synchronized (queue) {
+ if (done) {
+ return;
+ } else if (queue.isEmpty()) {
+ try {
+ queue.wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ continue;
+ } else {
+ ev = (PacketWrittenEvent) queue.removeFirst();
+ }
+ }
+
+ // In most cases here no locks are necessary because this is
+ // the only thread that can cause decoding to occur and we
+ // only get information that doesn't change depending on the
+ // actions of other threads.
+
+ int blockNum = ev.getBlockNum();
+ int blockPacketCount = ev.getBlockPacketCount();
+
+ int paddingCount = k-params.getUnexpandedPacketCount(blockNum);
+
+ if (blockPacketCount < k-paddingCount) {
+ // not enough packets.
+ continue;
+ } else if (pp.isBlockDecoded(blockNum)) {
+ // already decoded.
+ // Don't throw an event because we already threw a
+ // PacketWrittenEvent.
+ continue;
+ }
+
+ // Only get up to blockPacketCount stripes because we use
+ // an iterative combination algorithm.
+ int[] possibleStripes = pp.getStripeNums(blockNum,
+ blockPacketCount);
+ int[] stripeNums = new int[k];
+
+ // The last written packet is in every combo.
+ stripeNums[k-paddingCount-1] = possibleStripes
+ [possibleStripes.length-1];
+
+ // The padding packets is in every combo.
+ for (int i=k-paddingCount;i<k;i++) {
+ stripeNums[i] = i;
+ }
+
+ // initialize the combos
+ int[] combo = new int[k-paddingCount-1];
+ for (int i=0;i<combo.length;i++) {
+ combo[i] = i;
+ }
+
+ do {
+ // copy the possibleStripes into the stripeNums using the
+ // given combo.
+ for (int i=0;i<combo.length;i++) {
+ stripeNums[i] = possibleStripes[combo[i]];
+ }
+
+ StringBuffer sb = new StringBuffer("stripeNums="+
+ stripeNums[0]);
+ for (int i=1;i<stripeNums.length;i++) {
+ sb.append(","+stripeNums[i]);
+ }
+ cat.debug(sb);
+
+ try {
+ if (tryDecode(blockNum,stripeNums)) {
+ break;
+ }
+ } catch (IOException e) {
+ getDecodeExceptionHandler().handleException
+ (new ExceptionEvent(FECFile.this,e));
+ }
+ } while (nextCombo(combo,blockPacketCount-1));
+
+ try {
+ // It should be safe to check getDecodedBlockCount() w/o
+ // a lock because this is the only thread that can modify
+ // that value.
+ if (pp.getDecodedBlockCount() == params.getBlockCount()) {
+ // download and decoding is complete, clean up, move
+ // the file and reopen it in read-only mode.
+ cat.debug("File Decoded, switching to read-only");
+
+ acquireAllWriteLocks();
+ try {
+ // Download and decoding is complete.
+ raf.setLength(params.getFileSize());
+ raf.setReadOnly();
+ } finally {
+ releaseAllWriteLocks();
+ }
+ fire(new FileDecodedEvent(FECFile.this));
+
+ // All done decoding, clean up and return.
+ close();
+ return;
+ }
+ } catch (Exception e) {
+ getDecodeExceptionHandler().handleException
+ (new ExceptionEvent(FECFile.this,e));
+ }
+ }
+ }
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,17 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * Superclass for all FEC IO related events.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FECIOEvent extends EventObject {
+ public FECIOEvent(Object source) {
+ super(source);
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,21 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * This interface is implemented to listen for FECIOEvents
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public interface FECIOListener extends EventListener {
+
+ public static final String NOTIFY = "notify";
+
+ public static final String[] EVENTS = new String[] { NOTIFY };
+
+ public void notify(FECIOEvent ev);
+}
+
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,248 @@
+package com.onionnetworks.fec.io;
+
+import com.onionnetworks.util.Util;
+
+/**
+ * This class contains all of the functions for performing packet, block, and
+ * stripe calculations given a set of FEC parameters. Most important are the
+ * boundary conditions which are difficult to keep straight.
+ *
+ * FECParameters objects are immutable, therefore they may be safely used
+ * without synchronization.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ * @author Ry4an Brase (ry4an at ry4an.org)
+ */
+public class FECParameters {
+
+ protected final int n,k,packetSize,numBlocks,firstGapStripe;
+ protected final long fileSize;
+
+ /**
+ * Creates a new FECParameters instance.
+ *
+ * @param k The number of vanilla packets per block.
+ * @param n The number of packets that the k vanilla packets can be
+ * expanded to.
+ * @param packetSize The size of each packet.
+ * @param fileSize The size of the original, unencoded file.
+ */
+ public FECParameters (int k, int n, int packetSize, long fileSize) {
+ if (k <= 0 || n < k || packetSize <= 0 || fileSize <= 0) {
+ throw new IllegalArgumentException
+ ("Argument is < 0 or n < k :"+"k="+k+",n="+n+
+ ",packetSize="+packetSize+",fileSize="+fileSize);
+ }
+
+ this.k = k;
+ this.n = n;
+ this.packetSize = packetSize;
+ this.fileSize = fileSize;
+
+ // Round up after division.
+ numBlocks = Util.divideCeil(fileSize,packetSize*k);
+
+ firstGapStripe = Util.divideCeil(getUnexpandedBlockSize(numBlocks-1),
+ packetSize);
+ }
+
+ /**
+ * @return The size of the original, unencoded file.
+ */
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ /**
+ * @return k, The number of vanilla packets per block.
+ */
+ public int getK() {
+ return k;
+ }
+
+ /**
+ * @return n, The number of packets that the k vanilla packets can be
+ * expanded to.
+ */
+ public int getN() {
+ return n;
+ }
+
+ /**
+ * @return The maximum (and default) size that most packets will be.
+ */
+ public int getPacketSize() {
+ return packetSize;
+ }
+
+ /**
+ * @return The maximum size (in bytes) that a stripe will be.
+ */
+ public int getMaxStripeSize() {
+ return packetSize*numBlocks;
+ }
+
+ /**
+ * @return The default number of vanilla bytes that a block will contain.
+ */
+ public int getUnexpandedBlockSize() {
+ return k*packetSize;
+ }
+
+ /**
+ * @return The maximum number of bytes that a fully encoded block can
+ * contain.
+ */
+ public int getExpandedBlockSize() {
+ return n*packetSize;
+ }
+
+ /**
+ * @return The number of blocks that this file will be partioned into.
+ */
+ public int getBlockCount() {
+ return numBlocks;
+ }
+
+ /**
+ * @return The maximum number of stripes (N) that be created from this
+ * file.
+ */
+ public int getNumStripes() {
+ return n;
+ }
+
+ /**
+ * @return The number of packets required to send across the original
+ * file. Also the minimum number of packets required to recreate the
+ * original file.
+ */
+ public int getUnexpandedPacketCount() {
+ return Util.divideCeil(fileSize,packetSize);
+ }
+
+ /**
+ * @param blockNum The blockNum for which to count packets.
+ *
+ * @return The number of packets requried to send across the original
+ * block.
+ */
+ public int getUnexpandedPacketCount(int blockNum) {
+ return Util.divideCeil(getUnexpandedBlockSize(blockNum),packetSize);
+ }
+
+ /**
+ * @param whichStripe The stripe for which we are counting packets.
+ *
+ * @return The number of packets in <code>whichStripe</code>. Most of the
+ * time this will be equal to <code>numBlocks</code>, but if this stripe
+ * lands on a gap in the last block then it will contain <code>numBlocks-1
+ * </code>
+ */
+ public int getStripePacketCount(int whichStripe) {
+ if (whichStripe >= firstGapStripe && whichStripe < k) {
+ return numBlocks-1;
+ } else {
+ return numBlocks;
+ }
+ }
+
+ /**
+ * @param whichBlock The block which the packet is in.
+ * @param whichStripe The stripe which the packet is in, or in other words
+ * the index of the packet within <code>whichBlock</code>
+ *
+ * @return The size of the packet. Normally the packet size will be
+ * the same as <code>getPacketSize()</code>. But if the packet is in
+ * the last block then it may be smaller. If the packet is in the last
+ * block and is in a gap between the end of the file and K, then it's size
+ * will be 0. Also if the packet is the last packet in the file (the
+ * packet right before the gap), then the packetSize > 0 && < maxPacketSize
+ */
+ public int getPacketSize(int whichBlock, int whichStripe) {
+ if (whichBlock == numBlocks-1) {
+ if (whichStripe >= firstGapStripe && whichStripe < k) {
+ return 0;
+ }
+ if (whichStripe == firstGapStripe-1) {
+ return ((int)fileSize) % packetSize;
+ }
+ }
+ return packetSize;
+ }
+
+ /**
+ * @param whichStripe The stripe for which we are finding the size of.
+ *
+ * @return The size of the stripe (in bytes). Normally the stripe size
+ * will simply be <code>packetSize*numBlocks</code> But if the stripe falls
+ * on a gap it will be less.
+ */
+ public long getStripeSize(int whichStripe) {
+ if (getStripePacketCount(whichStripe) == numBlocks-1) {
+ return packetSize*(numBlocks-1);
+ } else if (whichStripe == firstGapStripe-1) {
+ if (fileSize % packetSize != 0) {
+ return (packetSize*(numBlocks-1)) + (fileSize % packetSize);
+ }
+ }
+
+ return packetSize*numBlocks;
+ }
+
+ /**
+ * @param whichBlock The block for which we are finding the size of.
+ *
+ * @return The size of the unexpanded block. Normally this will simply
+ * be <code>k*packetSize</code>. But if this is the last block then
+ * it may be less.
+ */
+ public int getUnexpandedBlockSize(int whichBlock) {
+ if (whichBlock == numBlocks-1) {
+ if (numBlocks == 1) {
+ return (int) fileSize;
+ } else {
+ return ((int) fileSize) - (k*(numBlocks-1)*packetSize);
+ }
+ }
+ return k*packetSize;
+ }
+
+ /**
+ * Padding packets are empty packets that should never been read or
+ * written and shouldn't be sent across the network (they are all '0's)
+ *
+ * @param blockNum The blockNum of the packet to check
+ * @param stripeNum The stripeNum of the packet to check
+ *
+ * @return true if this packet is a padding packet.
+ */
+ public boolean isPaddingPacket(int blockNum, int stripeNum) {
+ return blockNum == numBlocks-1 &&
+ stripeNum >= getUnexpandedPacketCount(blockNum) &&
+ stripeNum < k;
+ }
+
+ public int hashCode() {
+ return k * n * (int) fileSize * packetSize;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof FECParameters) {
+ FECParameters f2 = (FECParameters) obj;
+ if (f2.k == k && f2.n == n && f2.fileSize == fileSize &&
+ f2.packetSize == packetSize) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String toString() {
+ return "FECParameters(k="+k+",n="+n+",packetSize="+packetSize+
+ ",fileSize="+fileSize+")";
+ }
+}
Added:
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
===================================================================
---
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
2006-09-26 20:10:33 UTC (rev 10513)
+++
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,23 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to write a packet to a
+ * file that has already been completely decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FileAlreadyDecodedException extends IOException {
+
+ public FileAlreadyDecodedException() {
+ super();
+ }
+
+ public FileAlreadyDecodedException(String msg) {
+ super(msg);
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,17 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that the file has been completely decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FileDecodedEvent extends FECIOEvent {
+
+ public FileDecodedEvent(Object source) {
+ super(source);
+ }
+}
+
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,27 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that a new packet entry has been added.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketEntryAddedEvent extends FECIOEvent {
+
+ int packetIndex;
+
+ public PacketEntryAddedEvent(Object source, int packetIndex) {
+ super(source);
+ this.packetIndex = packetIndex;
+ }
+
+ /**
+ * @return the packetIndex of the packet just written
+ */
+ public int getPacketIndex() {
+ return packetIndex;
+ }
+}
+
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,41 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This class provides a (blockNum,stripeNum) tuple that is comparable for
+ * equality.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketInfo {
+
+ int blockNum, stripeNum;
+
+ public PacketInfo(int blockNum, int stripeNum) {
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ }
+
+ public int getBlockNum() {
+ return blockNum;
+ }
+
+ public int getStripeNum() {
+ return stripeNum;
+ }
+
+ public int hashCode() {
+ return blockNum*stripeNum;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof PacketInfo &&
+ ((PacketInfo) obj).blockNum == blockNum &&
+ ((PacketInfo) obj).stripeNum == stripeNum) {
+ return true;
+ }
+ return false;
+ }
+}
Added:
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,39 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to read a packet that
+ * can't be found, or created from the data on disk.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketNotFoundException extends IOException {
+
+ int blockNum, stripeNum;
+
+ public PacketNotFoundException(int blockNum, int stripeNum) {
+ super();
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ }
+
+ public PacketNotFoundException(String msg, int blockNum, int stripeNum) {
+ super(msg);
+
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ }
+
+ public int getBlockNum() {
+ return blockNum;
+ }
+
+ public int getStripeNum() {
+ return stripeNum;
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,232 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+import org.apache.log4j.Category;
+
+/**
+ * This class allocates and tracks how packets are written to disk. It
+ * is fully synchronized to safely support multi-threaded access. Its
+ * data structures and operations are fairly well optimized.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketPlacement {
+
+ public static final short DECODED_BLOCK = -1;
+
+ static Category cat = Category.getInstance(PacketPlacement.class.
+ getName());
+
+ FECParameters params;
+
+ int decodedBlockCount, totalPacketCount, extEntryCount;
+
+ // Tracks the number of packets in each block, if the block is decoded then
+ // it is set to DECODED_BLOCK.
+ short[] packetCount;
+
+ // List of all packets for each block in order written.
+ ArrayList[] entries;
+
+ // Maps block/stripe to packetIndexes.
+ HashMap[] revEntries;
+
+ int k, blockCount;
+
+
+ /**
+ * Creates a new PacketPlacement object.
+ *
+ * @param params The FECParameters for this download.
+ */
+ public PacketPlacement(FECParameters params) {
+
+ this.params = params;
+
+ k = params.getK();
+ blockCount = params.getBlockCount();
+
+ packetCount = new short[blockCount];
+ entries = new ArrayList[blockCount];
+ revEntries = new HashMap[blockCount];
+ }
+
+ /**
+ * @return the number of blocks that have been decoded and written
+ * back to disk.
+ */
+ public synchronized int getDecodedBlockCount() {
+ return decodedBlockCount;
+ }
+
+ /**
+ * Signify that this block has been decoded.
+ *
+ * @param blockNum The blockNum of the decoded block.
+ */
+ public synchronized void setBlockDecoded(int blockNum) {
+ if (packetCount[blockNum] == DECODED_BLOCK) {
+ throw new IllegalStateException
+ ("This block is already decoded:"+blockNum);
+ }
+ // This block has been decoded.
+ packetCount[blockNum] = DECODED_BLOCK;
+ decodedBlockCount++;
+
+ // Update the forward and reverse entries
+ entries[blockNum] = null;
+ revEntries[blockNum] = null;
+ }
+
+ /**
+ * @param The blockNum of the block to check.
+ * @return true if the block has been decoded.
+ */
+ public synchronized boolean isBlockDecoded(int blockNum) {
+ return packetCount[blockNum] == DECODED_BLOCK;
+ }
+
+ /**
+ * @param blockNum the blockNum for which to count packets.
+ *
+ * @return The number of packets in the block.
+ * PacketPlacement.DECODED_BLOCK (-1) if the block is decoded.
+ */
+ public synchronized int getPacketCount(int blockNum) {
+ return packetCount[blockNum];
+ }
+
+ /**
+ * @return The total number of packets that have been written to disk.
+ */
+ public synchronized int getWrittenCount() {
+ return totalPacketCount;
+ }
+
+ /**
+ * Perform a reverse lookup on the index to find the index of a specific
+ * packet.
+ *
+ * @param blockNum The blockNum of the packet.
+ * @param stripeNum The stripeNum of the packet.
+ *
+ * @return the packetIndex if the packet is on disk, else -1
+ */
+ public synchronized int getPacketIndex(int blockNum, int stripeNum) {
+ // If its decoded then the vanilla packets are on disk.
+ if (isBlockDecoded(blockNum)) {
+ return (stripeNum >= 0 && stripeNum < k) ?
+ blockNum*k+stripeNum : -1;
+ }
+
+ HashMap h = revEntries[blockNum];
+
+ // its not decoded, so a null h means that there are no packets.
+ if (h != null) {
+ Integer pi = (Integer) h.get(new Integer(stripeNum));
+ if (pi != null) {
+ return pi.intValue();
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Add a new entry to an available slot. BlockFulls will be thrown
+ * in preference to DuplicatePackets.
+ *
+ * @param blockNum The blockNum of the entry.
+ * @param stripeNum the stripeNum of the entry.
+ *
+ * @return the packetIndex which the data will be written to.
+ *
+ * @throws DuplicatePacketException When there is an attempt to write a
+ * packet a second time.
+ * @throws BlockFullException When the desired block is already full.
+ */
+ public synchronized int addPacketEntry(int blockNum, int stripeNum) {
+
+ // block already decoded
+ if (isBlockDecoded(blockNum)) {
+ throw new IllegalStateException
+ ("block already decoded, blockNum="+blockNum+",stripeNum="+
+ stripeNum);
+ }
+
+ // duplicate packet.
+ if (getPacketIndex(blockNum,stripeNum) != -1) {
+ throw new IllegalArgumentException
+ ("Duplicate entry for blockNum="+blockNum+",stripeNum="+
+ stripeNum+",packetIndex="+getPacketIndex(blockNum,stripeNum));
+ }
+
+ short blockPacketCount = (short) getPacketCount(blockNum);
+ int packetIndex = -1;
+
+ ArrayList blockEntries = entries[blockNum];
+
+ // first packet for this block.
+ if (blockEntries == null) {
+ blockEntries = new ArrayList();
+ entries[blockNum] = blockEntries;
+ }
+
+ // Add the entry.
+ blockEntries.add(new Integer(stripeNum));
+
+ // check to see if this'll be an extended entry.
+ if (blockPacketCount < k) {
+ // regular entry in the block's area on disk.
+ packetIndex = blockNum*k+blockPacketCount;
+ } else {
+ // extended entry at the end of the file.
+ packetIndex = blockCount*k+extEntryCount;
+ extEntryCount++;
+ }
+
+ HashMap revBlockEntries = revEntries[blockNum];
+ if (revBlockEntries == null) {
+ revBlockEntries = new HashMap();
+ revEntries[blockNum] = revBlockEntries;
+ }
+ revEntries[blockNum].put(new Integer(stripeNum),
+ new Integer(packetIndex));
+
+ packetCount[blockNum]++;
+ totalPacketCount++;
+
+ return packetIndex;
+ }
+
+ /*
+ * Returns the list of stripes available for a specific blockNum. These
+ * stripes will be in the order that they were written on disk.
+ *
+ * @param blockNum The blockNum for which you want stripes.
+ *
+ * @return an int[] containing the stripes found for that blockNum.
+ */
+ public synchronized int[] getStripeNums(int blockNum) {
+ return getStripeNums(blockNum,entries[blockNum].size());
+ }
+
+ /*
+ * Returns the list of stripes available for a specific blockNum. These
+ * stripes will be in the order that they were written on disk.
+ *
+ * @param blockNum The blockNum for which you want stripes.
+ * @param count The number of stripes to return.
+ *
+ * @return an int[] containing the stripes found for that blockNum.
+ */
+ public synchronized int[] getStripeNums(int blockNum, int count) {
+ int[] result = new int[count];
+ for (int i=0;i<result.length;i++) {
+ result[i] = ((Integer) entries[blockNum].get(i)).intValue();
+ }
+ return result;
+ }
+}
Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,48 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * This event signifies that a new packet was recieved and written to disk.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketWrittenEvent extends FECIOEvent {
+
+ int blockNum,stripeNum,blockPacketCount;
+
+ public PacketWrittenEvent(Object source, int blockNum, int stripeNum,
+ int blockPacketCount) {
+ super(source);
+ this.blockNum = blockNum;
+ this.stripeNum = stripeNum;
+ this.blockPacketCount = blockPacketCount;
+ }
+
+ /**
+ * @return the blockNum of the packet just written.
+ */
+ public int getBlockNum() {
+ return blockNum;
+ }
+
+ /**
+ * @return the stripeNum of the packet just written.
+ */
+ public int getStripeNum() {
+ return stripeNum;
+ }
+
+ /**
+ * @return the blockPacketCount for this write. This value is used to
+ * tell how many packets have been written to disk at this time. Remember
+ * that more packets can be written to disk than need be for repair.
+ */
+ public int getBlockPacketCount() {
+ return blockPacketCount;
+ }
+}
+