Author: toad
Date: 2006-09-26 23:18:01 +0000 (Tue, 26 Sep 2006)
New Revision: 10514

Added:
   trunk/contrib/fec_src/com/onionnetworks/fec/io/
   
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
   
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
   trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
Log:


Added: 
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
===================================================================
--- 
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
    2006-09-26 20:10:33 UTC (rev 10513)
+++ 
trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockAlreadyDecodedException.java
    2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,59 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to write a packet to a
+ * block that has already been decoded. 
+ *  
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class BlockAlreadyDecodedException extends IOException {
+    
+    int blockNum, stripeNum;
+
+    /**
+     * Constructs a new Exception
+     *
+     * @param blockNum The blockNum of the attempted packet
+     * @param stripeNum The stripeNum of the attempted packet
+     */
+    public BlockAlreadyDecodedException(int blockNum, int stripeNum) {
+        super();
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+    }
+
+    /**
+     * Constructs a new Exception
+     *
+     * @param blockNum The blockNum of the attempted packet
+     * @param stripeNum The stripeNum of the attempted packet
+     * @param msg An informational message to include
+     */
+    public BlockAlreadyDecodedException(String msg, int blockNum, 
+                                        int stripeNum) {
+        super(msg);
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+    }
+
+    /**
+     * @return The blockNum of the attempted packet.
+     */
+    public int getBlockNum() {
+        return blockNum;
+    }
+
+    /** 
+     * @return the stripeNum of the attempted packet.
+     */
+    public int getStripeNum() {
+        return stripeNum;
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java       
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/BlockDecodedEvent.java       
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,27 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that a complete block was decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class BlockDecodedEvent extends FECIOEvent {
+
+    int blockNum;
+
+    public BlockDecodedEvent(Object source, int blockNum) {
+       super(source);
+       this.blockNum = blockNum;
+    }
+
+    /**
+     * @return the blockNum of the block that was decoded.
+     */
+    public int getBlockNum() {
+       return blockNum;
+    }
+}
+       

Added: 
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java
===================================================================
--- 
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java    
    2006-09-26 20:10:33 UTC (rev 10513)
+++ 
trunk/contrib/fec_src/com/onionnetworks/fec/io/DuplicatePacketException.java    
    2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,56 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signifies that there was attempt to write a duplicate packet
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class DuplicatePacketException extends IOException {
+    
+    int blockNum, stripeNum, packetIndex;
+
+    public DuplicatePacketException(int blockNum, int stripeNum, 
+                                    int packetIndex) {
+        super();
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+        this.packetIndex = packetIndex;
+    }
+
+    public DuplicatePacketException(String msg, int blockNum, int stripeNum,
+                                    int packetIndex) {
+        super(msg);
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+        this.packetIndex = packetIndex;
+    }
+
+    /**
+     * @return the blockNum of the attempted packet
+     */
+    public int getBlockNum() {
+        return blockNum;
+    }
+
+    /**
+     * @return the stripeNum of the attempted packet
+     */
+    public int getStripeNum() {
+        return stripeNum;
+    }
+
+    /**
+     * @return the packetIndex of the existing copy of the packet.  -1 if the
+     * packetIndex is unknown or there was an attempt to write a padding packet
+     */
+    public int getPacketIndex() {
+        return packetIndex;
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java 2006-09-26 
20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECFile.java 2006-09-26 
23:18:01 UTC (rev 10514)
@@ -0,0 +1,805 @@
+package com.onionnetworks.fec.io;
+
+import java.io.*;
+import java.util.*;
+import java.security.*;
+
+import org.apache.log4j.Category;
+
+import com.onionnetworks.fec.FECCode;
+import com.onionnetworks.fec.FECCodeFactory;
+import com.onionnetworks.io.RAF;
+import com.onionnetworks.util.*;
+import EDU.oswego.cs.dl.util.concurrent.*;
+
+/**
+ * This class provides the necessary file IO routines to go along with the raw
+ * FEC codes.  It is completely thread safe for multiple readers and writers,
+ * and will automatically encode and decode the packets ass necessary.  If
+ * the FECFile is originally opened in "rw" mode it will revert to "r" mode
+ * once it has recieved enough data and decoded the entire file.
+ *
+ * File Encoding Example:
+ * <code>
+ *   File f = new File(args[0]);
+ * 
+ *   int k = 32;  // source packet per block
+ *   int n = 256; // number of packets to expand each block to.
+ *   int packetSize = 1024; // number of bytes in each packet.
+ *
+ *   FECParameters params = new FECParameters(k,n,packetSize,f.length());
+ *
+ *   FECFile fecF = new FECFile(f,"r",params);
+ *
+ *   // Read the packet with blockNum=0,stripeNum=32 into the Buffer
+ *   // The read() interface is much faster if you encode multiple packets
+ *   // per block and encourages you to do so.  We are not going to for
+ *   // simplicity.
+ *   Buffer b = new Buffer(packetSize);
+ *   fecF.read(new Buffer[] {b}, 0, new int[] {32});
+ * </code>
+ *  
+ * Please see tests/FECFileTest for a further example.
+ *
+ * <lu>
+ * Thread/synchronization behavior:
+ *
+ * The FECFile tries to maintain as little contention as possible, but seeing
+ * as it involves both IO intensive and processor intensive functions, this is
+ * rather difficult.
+ *
+ * <li> Disk IO is fully synchronized.  This doesn't really make any difference
+ * because you can't do two parallel IO operations on the same file anyway.
+ *
+ * <li> The highest level of synchronization is per-block reader/writer locks. 
+ * This means that any number of threads may read/encode at parallel and that
+ * a write or decode operation blocks all other threads trying to access the
+ * same block.  Since encoding is a CPU intensive task, there will be minimal 
+ * gains from parallel encoding, but the opportunity for encoding and disk IO 
+ * to occur in parallel may afford some savings.
+ * </lu>
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FECFile {
+
+    static Category cat = Category.getInstance(FECFile.class.getName());
+
+    private FECParameters params;
+
+    private RAF raf; // synched RandomAccessFile
+    private PacketPlacement pp;
+    private FileIntegrity integrity;
+    private MessageDigest md;
+
+    private FECCode code;
+
+    private ReflectiveEventDispatch dispatch;
+
+    private int k,n,packetSize,blockSize,blockCount;
+
+    private ReadWriteLock[] locks;
+
+    private Decoder decoder;
+
+    protected ExceptionHandler exceptionHandler;
+
+    /**
+     * Open the file according to <code>mode</code (either "r" or "rw") and 
+     * encoding/decoding according to the FECParameters.  This constructor
+     * will simply set the destination file to be the same as f if it is opened
+     * in "rw" mode.
+     *
+     * @param f The File to which the data will be read/written
+     * @param mode Either "r" or "rw"
+     * @param params The FECParameters that specify how the data will be
+     * encoded and decoded.
+     */
+    public FECFile(File f, String mode, FECParameters params) 
+       throws IOException {
+        this(f,mode,params,null);
+    }
+
+    /**
+     * Open the file according to <code>mode</code (either "r" or "rw") and 
+     * encoding/decoding according to the FECParameters.  This constructor
+     * will simply set the destination file to be the same as f if it is opened
+     * in "rw" mode.
+     *
+     * @param f The File to which the data will be read/written
+     * @param mode Either "r" or "rw"
+     * @param params The FECParameters that specify how the data will be
+     * encoded and decoded.
+     * @param integrity Used for checking the file integrity.
+     */
+    public FECFile(File f, String mode, FECParameters params, 
+                   FileIntegrity integrity) throws IOException {
+        this.params = params;
+        this.integrity = integrity;
+       this.k = params.getK();
+       this.n = params.getN();
+       this.packetSize = params.getPacketSize();
+       this.blockSize = params.getUnexpandedBlockSize();
+        this.blockCount = params.getBlockCount();
+
+        this.code = FECCodeFactory.getDefault().createFECCode(k,n);
+        this.raf = new RAF(f,mode); // synched RandomAccessFile        
+
+        // Create the locks.
+        locks = new ReadWriteLock[blockCount];
+        for (int i=0;i<locks.length;i++) {
+            locks[i] = new ReentrantWriterPreferenceReadWriteLock();
+        }
+
+        // add the default exception handler.
+        setDecodeExceptionHandler(new ExceptionHandler() {
+                public void handleException(ExceptionEvent ev) {
+                    cat.warn("Set an ExceptionHandler via "+
+                             "FECFile.setDecodeExceptionHandler()",
+                            ev.getException());
+                }
+            });
+
+       if (mode.equals("rw")) {
+            // create the event dispatcher (only used in rw mode)
+            dispatch = new ReflectiveEventDispatch();
+
+            if (integrity == null) {
+                throw new IllegalArgumentException("Integrity requried in rw");
+            }
+
+            // set up the message digest
+            try {
+                md = MessageDigest.getInstance(integrity.getAlgorithm());
+            } catch (NoSuchAlgorithmException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+
+            // create the packet placement.  Once it is created it cannot
+            // be deleted to guarentee the thread-safety of some of the
+            // utility methods in this class.  Also note that the pp
+            // must never be set outside of the constructor.
+            this.pp = new PacketPlacement(params);
+
+           // set up and start the decoder.
+            decoder = new Decoder();
+           addFECIOListener(decoder);
+           new Thread(decoder,"Decoder Thread").start();
+       } else {
+            if (integrity != null) {
+                throw new IllegalArgumentException
+                    ("Integrity not used in read-only mode");
+            }
+        }
+    }
+
+    /**
+     * Sets the ExceptionHandler for dealing with problems that occur during
+     * decoding.
+     * 
+     * @param ExceptionHandler This object that will handle the exception.
+     */
+    public synchronized void setDecodeExceptionHandler(ExceptionHandler eh) {
+        this.exceptionHandler = eh;
+    }
+
+    protected synchronized ExceptionHandler getDecodeExceptionHandler() {
+        return exceptionHandler;
+    }
+
+    // We can't have a global buffer so I am praying that the GC can
+    // quickly reclaim these buffers and reallocate them.  If the GC doesn't
+    // work out for this then we'll have to create our own BufferPool.
+    protected static final byte[] createBuffer(int len) {
+        return new byte[len];
+    }
+
+    protected final byte[] createBuffer() {
+        return createBuffer(blockSize);
+    }
+
+    protected final Buffer[] wrapBuffer(byte[] b) {
+        if (b.length != blockSize) {
+            throw new IllegalArgumentException("b.length != "+blockSize);
+        }
+       // Wrap up the byte[] with some Buffers
+       Buffer[] bufs = new Buffer[k];
+       for (int i=0;i<bufs.length;i++) {
+           bufs[i] = new Buffer(b,i*packetSize,packetSize);
+       }
+        return bufs;
+    }
+
+    /**
+     * When the File is opened in r/w mode you must specify a destination.
+     * This method allows you to specify the destination lazily to allow
+     * downloads to take place before the user has even chosen the final
+     * location with their file picker.  Be ware that the final write to
+     * that would cause the file to be moved and re-opened will block until
+     * the destination file is set, so make sure to set it asap.
+     */
+    public void renameTo(File destFile) throws IOException {
+        raf.renameTo(destFile);
+    }
+
+    /**
+     * This method reads a number of packets (encoding them if necessary) into
+     * the provided buffers.  The method accepts an array of stripeNums and 
+     * an array of Buffers because it is vastly more efficient to encode
+     * multiple blocks all at once rather than across multiple method calls.
+     *
+     * Calls to read should be safe even during the rw->r switch because
+     * a packet that existed will never become unavailable.
+     *
+     * @param pkts The array of Buffers into which the data will be stored
+     * each Buffer must be <code>params.getPacketSize()</code> in length.
+     *
+     * @param blockNum The block num of the packets to encode.
+     * @param stripeNums The stripe nums of the packets to encode.
+     *
+     * @throws IOException If there is an IOException in the underlying file 
+     * IO.
+     * @throws PacketNotFoundException If the desired packet can not be found
+     * in the PacketPlacement.
+     */
+    public void read(Buffer[] pkts, int blockNum, int[] stripeNums) 
+        throws IOException, PacketNotFoundException {
+
+        if (blockNum < 0 || blockNum >= blockCount) {
+            throw new IllegalArgumentException
+                ("Illegal block# : blockNum="+blockNum+
+                 ",stripeNum="+stripeNums[0]);
+        }
+        
+        int ubs = -1;
+        byte[] b = null;
+
+        try {
+            locks[blockNum].readLock().acquire();
+           try {
+
+                // This raf check then pp access is safe because all
+                // locks must be aquired to change the raf's mode.
+                if (raf.getMode().equals("r") || pp.isBlockDecoded(blockNum)) {
+                    // FIX if all of the desired packets are on disk then
+                   // we shouldn't read in the whole block.
+                    // read inside the lock and encode outside.
+                    ubs = params.getUnexpandedBlockSize(blockNum);
+                    b = createBuffer();
+                    raf.seekAndReadFully(blockNum*blockSize,b,0,ubs);
+                    
+                } else {
+                    // read the packets straight from disk.
+                    for (int i=0;i<stripeNums.length;i++) {
+                       if (params.isPaddingPacket(blockNum,stripeNums[i])) {
+                           // deal with padding packet.
+                           Util.bzero(pkts[i].b,pkts[i].off,pkts[i].len);
+                           continue;
+                       }
+                        int packetIndex = pp.getPacketIndex
+                            (blockNum,stripeNums[i]);
+                        if (packetIndex == -1) {
+                            throw new PacketNotFoundException
+                                ("Packet not on disk: blockNum="+blockNum+
+                                 ",stripeNum="+stripeNums[i],blockNum,
+                                 stripeNums[i]);
+                        }
+                        
+                        // we read in whole packets because they are already
+                        // zero padded if necessary.
+                        raf.seekAndReadFully(packetIndex*packetSize,pkts[i].b,
+                                             pkts[i].off,pkts[i].len);
+                    }
+                    return;
+                }
+           } finally {
+                locks[blockNum].readLock().release();
+           }
+       } catch (InterruptedException e) { 
+           throw new InterruptedIOException(e.toString());
+       }
+        
+        // encode outside of the locks.
+        // Keep the gaps as zero's for encoding.
+        Util.bzero(b,ubs,b.length-ubs);
+        code.encode(wrapBuffer(b),pkts,stripeNums);
+    }
+
+
+    /**
+     * Writes a packet to disk.  If the packet is the k'th packet that
+     * has been written for this block, then the block will be decoded in
+     * this call.  Since decoding is a relatively costly process it may
+     * cause your program to experience some bursty behavior in how long
+     * the write calls take.  None-the-less if you are making any timing
+     * judgements based off of IO then you are an idiot.
+     *
+     * If this packet is the last packet that needs to be written to disk 
+     * in order to decode the whole file, the block will be decoded and
+     * then the file will be properly truncated, closed, moved to the 
+     * destination file and re-opened in read-only mode.  If the destFile
+     * has not been set yet then this will block until that is set.
+     *
+     * @param pkt The Buffer from which the data will be written.
+     * @param blockNum The blockNum of the packet to be written.
+     * @param stripeNum The stripeNum of the packet to be written.
+     *
+     * @throws IOException When there is a disk IOException.  
+     * @throws DuplicatePacketException When there is an attempt to write a
+     *   packet a second time.  
+     * @throws BlockAlreadyDecodedException When the block is already decoded. 
+     * @throws FileAlreadyDecodedException When you are trying to write a
+     *   packet when the file has been decoded and switched to read-only 
+     *
+     * @return The block packet count after writing this packet.  This
+     * gives you the order the packets were written in for this block.
+     */
+    public int write(Buffer pkt, int blockNum, int stripeNum) 
+        throws IOException, FileAlreadyDecodedException {
+       
+        int result = -1;
+        try {
+            locks[blockNum].writeLock().acquire();
+           try {
+                result = write0(pkt,blockNum,stripeNum);
+           } finally {
+                locks[blockNum].writeLock().release();
+           }
+       } catch (InterruptedException e) { 
+           throw new InterruptedIOException(e.toString());
+       }
+
+        // Hopefully this will give the decoder thread a chance to catch up.
+        if (result >= k) {
+            Thread.yield();
+        }
+        return result;
+    }
+
+    /**
+     * locks[blockNum].writeLock() acquired.
+     */
+    private int write0(Buffer pkt, int blockNum, int stripeNum) 
+       throws IOException, DuplicatePacketException, 
+        BlockAlreadyDecodedException, FileAlreadyDecodedException {
+
+       //cat.debug("Writing blockNum="+blockNum+",stripeNum="+stripeNum);
+           
+        // read-only
+        if (raf.getMode().equals("r")) {
+            throw new FileAlreadyDecodedException
+                ("Attempted to write packet in read-only mode");
+        }
+
+        // padding packet
+        if (params.isPaddingPacket(blockNum,stripeNum)) {
+            cat.warn
+                ("You have attempted to write a padding packet which is "+
+                 "is already generated by this FECFile and shouldn't be "+
+                 "sent across the network.  Talk to Justin for more info. "+
+                 "blockNum="+blockNum+",stripeNum="+stripeNum);
+            throw new DuplicatePacketException
+                ("Attempted padding packet write. blockNum="+blockNum+
+                 ",stripeNum="+stripeNum,blockNum,stripeNum,-1);
+        }
+
+        // block already decoded
+        if (pp.isBlockDecoded(blockNum)) {
+            throw new BlockAlreadyDecodedException
+                ("Block already decoded : blockNum="+blockNum+",stripeNum="+
+                 stripeNum,blockNum,stripeNum);
+        } 
+
+        // duplicate packet
+        if (pp.getPacketIndex(blockNum,stripeNum) != -1) {
+            int i = pp.getPacketIndex(blockNum,stripeNum);
+            throw new DuplicatePacketException
+                ("Duplicate packet: blockNum="+blockNum+"stripeNum="+stripeNum+
+                 "index="+i, blockNum,stripeNum,i);
+        }
+
+       int packetIndex = -1;
+       int packetCount = -1;
+        // sync around the calls so that the packetCount is correct.
+       synchronized (pp) {
+           packetIndex = pp.addPacketEntry(blockNum,stripeNum);
+           packetCount = pp.getPacketCount(blockNum);
+       }
+           
+        raf.seekAndWrite(packetIndex*packetSize,
+                         pkt.b,pkt.off,pkt.len);
+
+        fire(new PacketWrittenEvent(this,blockNum,stripeNum,packetCount));
+        return packetCount;
+    }
+    
+    /**
+     * Try to decode the specified block.  This method will read in the block,
+     * decode it, and write the decoded block back to disk.
+     * @return true if the block was successfully decoded and verified.
+     */
+    protected boolean tryDecode(int blockNum, int[] stripeNums) 
+     throws IOException {
+       cat.debug("trying to decode block : blockNum="+blockNum);
+        
+       byte[] b = createBuffer();
+       Buffer[] bufs = wrapBuffer(b);
+       // read the packets from disk.
+       read(bufs,blockNum,stripeNums);
+        
+        // decode the fucker.
+        code.decode(bufs,stripeNums);
+      
+        // check the integrity
+        int ubs = params.getUnexpandedBlockSize(blockNum);
+       synchronized (md) {   
+           md.update(b,0,ubs);
+           Buffer hash = integrity.getBlockHash(blockNum);
+           if (!Util.arraysEqual(md.digest(),0,hash.b,hash.off,hash.len)) {
+               return false;
+           }
+       }
+
+       try {
+           locks[blockNum].writeLock().acquire();
+           try {
+               // seek and write the decoded block.
+               raf.seekAndWrite(blockNum*blockSize,b,0,b.length);
+               // Update the placement to show decoded entries.
+               pp.setBlockDecoded(blockNum);
+           } finally {
+               locks[blockNum].writeLock().release();
+           }
+       } catch (InterruptedException e) { 
+           throw new InterruptedIOException(e.toString());
+       }
+        
+       fire(new BlockDecodedEvent(this,blockNum));
+       return true;
+    }
+      
+    /**
+     * Acquires all write block locks.  This method ABSOLUTELY CANNOT be called
+     * if a blockLock has already been acquired by this thread because
+     * of the possibility for deadly embrace.
+     */
+    public void acquireAllWriteLocks() throws InterruptedException {
+        for (int i=0;i<locks.length;i++) {
+            locks[i].writeLock().acquire();
+        }
+    }
+
+    /**
+     * Releases all write block locks.  This method ABSOLUTELY CANNOT be called
+     * if a blockLock has already been acquired by this thread because
+     * of the possibility for deadly embrace.
+     */
+    public void releaseAllWriteLocks() throws InterruptedException {
+        for (int i=0;i<locks.length;i++) {     
+            locks[i].writeLock().release();
+        }   
+    }
+
+    /**
+     * Close the underlying file descriptor and free up the resources 
+     * associated with this FECFile.
+     */
+    public void close() throws IOException {
+        try {
+            acquireAllWriteLocks();
+           try {
+                raf.close();
+           } finally {
+                releaseAllWriteLocks();
+           }
+       } catch (InterruptedException e) { 
+           throw new InterruptedIOException(e.toString());
+       }
+
+        if (decoder != null) {
+            // clean up the decoder.
+            decoder.close();
+        }
+
+        if (dispatch != null) {
+            // Free up the dispatch thread.
+            dispatch.close();
+        }
+    }
+
+    /**
+     * Adds a new FECIOListener.
+     *
+     */
+    public void addFECIOListener(FECIOListener fil) {
+        if (dispatch == null) {
+            throw new IllegalStateException("No events in read-only mode");
+        }
+        dispatch.addListener(this,fil, FECIOListener.EVENTS);
+    }
+
+    public void removeFECIOListener(FECIOListener fil) {
+        if (dispatch == null) {
+            throw new IllegalStateException("No events in read-only mode");
+        }
+        dispatch.removeListener(this,fil, FECIOListener.EVENTS);
+    }
+
+    /**
+     * Fire an event.
+     */
+    protected void fire(FECIOEvent ev) {
+        dispatch.fire(ev,"notify");
+    }
+    
+    /**
+     * @return The decoded block count
+     */
+    public int getDecodedBlockCount() {
+        // This should be safe to access w/o the locks because it is a
+        // read-only operation and the PacketPlacement is completely
+        // synchronized.  Also note that the pp == null check should be
+        // safe because pp can only be set in the constructor and once
+        // it has been created it will never be deleted.
+        return pp == null ? blockCount : pp.getDecodedBlockCount();
+    }
+
+    /**
+     * @return true if the block is decoded.
+     */
+    public boolean isBlockDecoded(int blockNum) {
+        // This should be safe to access w/o the locks because it is a
+        // read-only operation and the PacketPlacement is completely
+        // synchronized.  Also note that the pp == null check should be
+        // safe because pp can only be set in the constructor and once
+        // it has been created it will never be deleted.
+        return pp == null ? true : pp.isBlockDecoded(blockNum);
+    }
+
+    /**
+     * @return true if the FECFile contains the specified packet.
+     */
+    public boolean containsPacket(int blockNum, int stripeNum) {
+        // This should be safe to access w/o the locks because it is a
+        // read-only operation and the PacketPlacement is completely
+        // synchronized.  Also note that the pp == null check should be
+        // safe because pp can only be set in the constructor and once
+        // it has been created it will never be deleted.
+        if (pp == null) {
+            return true;
+        }
+        synchronized (pp) {
+            if (pp.isBlockDecoded(blockNum)) {
+                return true;
+            } else {
+                return pp.getPacketIndex(blockNum,stripeNum) != -1;
+            }
+        }
+    } 
+
+    /**
+     * @return The number of packets written to disk.
+     */
+    public int getWrittenCount() {
+        // This should be safe to access w/o the locks because it is a
+        // read-only operation and the PacketPlacement is completely
+        // synchronized.  Also note that the pp == null check should be
+        // safe because pp can only be set in the constructor and once
+        // it has been created it will never be deleted.
+        return pp == null ? params.getUnexpandedPacketCount() : 
+            pp.getWrittenCount();
+    }
+
+    /** 
+     * @return The FECParameters used for encoding/decoding. 
+     */
+    public FECParameters getFECParameters() { 
+        return params; 
+    }
+    
+    /**
+     * @return true if this file is decoded.
+     */
+    public boolean isDecoded() {
+        // no locking required, raf is already synchronized.
+        return raf.getMode().equals("r");
+    }
+
+    /**
+     * This method blocks and returns once the file has been decoded.  This
+     * is primarily so that you know when it is safe to close the FECFile
+     * without having to setup your own FECFileListeners to wait for the 
+     * FileDecodedEvent.
+     */
+    public void waitForFileDecoded() throws InterruptedException {
+        FECIOListener fil = new FECIOListener() {
+                public void notify(FECIOEvent ev) {
+                    if (ev instanceof FileDecodedEvent) {
+                        synchronized (this) {
+                            this.notify();
+                        }
+                    }
+                }
+            };
+
+        // synch around the fil to make sure it isn't notified before we
+        // start waiting on it.
+        synchronized (fil) {
+            addFECIOListener(fil);
+            // isDecoded() is always set to true before the event is thrown.
+            if (!isDecoded()) {
+                fil.wait();
+            }
+            removeFECIOListener(fil);
+        }
+    }
+
+    public class Decoder implements Runnable, FECIOListener {
+       
+       LinkedList queue = new LinkedList();
+       boolean done = false;
+
+       public void notify(FECIOEvent ev) {
+           if (ev instanceof PacketWrittenEvent) {
+               synchronized (queue) {
+                   queue.add(ev);
+                   queue.notify();
+               }
+           }
+       }
+
+        public void close() {
+            synchronized (queue) {
+                // close already called.
+                if (done) {
+                    return;
+                }
+                removeFECIOListener(this);
+                done = true;
+                queue.clear();
+                queue.notify();
+            }
+        }
+
+        /**
+         * Generates the next combo for decoding.
+         * @param combo (in/out) Modified to provide the next combo.
+         * @param reserved The number of indexes guarenteed to be in every
+         * combo.  These are the last indexes in the combo array.
+         * @param n The number of elements in the set to choose the combo from.
+         *
+         * @return false if there are no more combos to generate.
+         */
+        private final boolean nextCombo(int[] combo, int n) {
+           // loop through the list from back to front generating combos
+           for (int i=combo.length-1;i>=0;i--) {
+               // search for the next.
+               if (combo[i] != (n-combo.length)+i) {
+                   combo[i]++;
+                   for (int j=i+1;j<combo.length;j++) {
+                       combo[j] = combo[j-1]+1;
+                   }
+                   return true;
+               }
+           }
+           return false;
+       }
+
+       public void run() {
+           PacketWrittenEvent ev = null;
+           
+           while (true) {
+               synchronized (queue) {
+                    if (done) {
+                        return;
+                    } else if (queue.isEmpty()) {
+                       try {
+                           queue.wait();
+                       } catch (InterruptedException e) {
+                           return;
+                       }
+                       continue;
+                   } else {
+                       ev = (PacketWrittenEvent) queue.removeFirst();
+                   }
+               }
+
+               // In most cases here no locks are necessary because this is 
+               // the only thread that can cause decoding to occur and we
+               // only get information that doesn't change depending on the
+               // actions of other threads.
+
+               int blockNum = ev.getBlockNum();
+               int blockPacketCount = ev.getBlockPacketCount();
+
+               int paddingCount = k-params.getUnexpandedPacketCount(blockNum);
+
+               if (blockPacketCount < k-paddingCount) {                
+                   // not enough packets.
+                   continue;
+               } else if (pp.isBlockDecoded(blockNum)) {
+                   // already decoded.
+                    // Don't throw an event because we already threw a 
+                    // PacketWrittenEvent.
+                   continue;
+               }
+                
+                // Only get up to blockPacketCount stripes because we use
+                // an iterative combination algorithm.
+               int[] possibleStripes = pp.getStripeNums(blockNum,
+                                                        blockPacketCount);
+               int[] stripeNums = new int[k];
+
+               // The last written packet is in every combo.
+               stripeNums[k-paddingCount-1] = possibleStripes
+                 [possibleStripes.length-1];
+              
+               // The padding packets is in every combo.
+               for (int i=k-paddingCount;i<k;i++) {
+                   stripeNums[i] = i;
+               }
+
+               // initialize the combos
+               int[] combo = new int[k-paddingCount-1];
+                for (int i=0;i<combo.length;i++) {
+                    combo[i] = i;
+                }
+
+               do {
+                   // copy the possibleStripes into the stripeNums using the 
+                   // given combo.
+                   for (int i=0;i<combo.length;i++) {
+                       stripeNums[i] = possibleStripes[combo[i]];
+                   }
+
+                    StringBuffer sb = new StringBuffer("stripeNums="+
+                                                       stripeNums[0]);
+                   for (int i=1;i<stripeNums.length;i++) {
+                       sb.append(","+stripeNums[i]);
+                   }
+                    cat.debug(sb);
+                   
+                   try {                   
+                       if (tryDecode(blockNum,stripeNums)) {
+                           break;
+                       }
+                   } catch (IOException e) {
+                        getDecodeExceptionHandler().handleException
+                           (new ExceptionEvent(FECFile.this,e));
+                   }
+               } while (nextCombo(combo,blockPacketCount-1));
+        
+                try {
+                    // It should be safe to check getDecodedBlockCount() w/o
+                    // a lock because this is the only thread that can modify
+                    // that value.
+                   if (pp.getDecodedBlockCount() == params.getBlockCount()) {
+                       // download and decoding is complete, clean up, move
+                       // the file and reopen it in read-only mode.
+                       cat.debug("File Decoded, switching to read-only");
+
+                       acquireAllWriteLocks();
+                       try {
+                           // Download and decoding is complete.
+                           raf.setLength(params.getFileSize());
+                           raf.setReadOnly();
+                       } finally {
+                           releaseAllWriteLocks();
+                       }
+                       fire(new FileDecodedEvent(FECFile.this));
+                       
+                       // All done decoding, clean up and return.
+                        close();
+                        return;
+                   }
+               } catch (Exception e) { 
+                    getDecodeExceptionHandler().handleException
+                       (new ExceptionEvent(FECFile.this,e));
+               }
+            }
+       }
+    }
+} 

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java      
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOEvent.java      
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,17 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * Superclass for all FEC IO related events.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FECIOEvent extends EventObject {
+    public FECIOEvent(Object source) {
+       super(source);
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java   
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECIOListener.java   
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,21 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * This interface is implemented to listen for FECIOEvents
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public interface FECIOListener extends EventListener {
+
+    public static final String NOTIFY = "notify";
+    
+    public static final String[] EVENTS = new String[] { NOTIFY };
+
+    public void notify(FECIOEvent ev);
+}
+

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java   
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FECParameters.java   
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,248 @@
+package com.onionnetworks.fec.io;
+
+import com.onionnetworks.util.Util;
+
+/**
+ * This class contains all of the functions for performing packet, block, and
+ * stripe calculations given a set of FEC parameters.  Most important are the 
+ * boundary conditions which are difficult to keep straight.
+ *
+ * FECParameters objects are immutable, therefore they may be safely used
+ * without synchronization.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ * @author Ry4an Brase (ry4an at ry4an.org)
+ */
+public class FECParameters {
+
+    protected final int n,k,packetSize,numBlocks,firstGapStripe;
+    protected final long fileSize;
+    
+    /**
+     * Creates a new FECParameters instance.
+     *
+     * @param k The number of vanilla packets per block.
+     * @param n The number of packets that the k vanilla packets can be 
+     * expanded to.
+     * @param packetSize The size of each packet.
+     * @param fileSize The size of the original, unencoded file.
+     */
+    public FECParameters (int k, int n, int packetSize, long fileSize) {
+        if (k <= 0 || n < k || packetSize <= 0 || fileSize <= 0) {
+            throw new IllegalArgumentException
+                ("Argument is < 0 or n < k :"+"k="+k+",n="+n+
+                 ",packetSize="+packetSize+",fileSize="+fileSize);
+        }
+
+        this.k = k;
+        this.n = n;
+        this.packetSize = packetSize;
+        this.fileSize = fileSize;
+
+        // Round up after division.
+        numBlocks = Util.divideCeil(fileSize,packetSize*k);
+
+        firstGapStripe = Util.divideCeil(getUnexpandedBlockSize(numBlocks-1),
+                                         packetSize);
+    }
+     
+    /**
+     * @return The size of the original, unencoded file.
+     */
+    public long getFileSize() {
+        return fileSize;
+    }
+
+    /**
+     * @return k, The number of vanilla packets per block.
+     */
+    public int getK() {
+        return k;
+    }
+
+    /**
+     * @return n, The number of packets that the k vanilla packets can be 
+     * expanded to.
+     */
+    public int getN() {
+        return n;
+    }
+
+    /**
+     * @return The maximum (and default) size that most packets will be.
+     */
+    public int getPacketSize() {
+        return packetSize;
+    }
+
+    /**
+     * @return The maximum size (in bytes) that a stripe will be.
+     */
+    public int getMaxStripeSize() {
+        return packetSize*numBlocks;
+    }
+
+    /**
+     * @return The default number of vanilla bytes that a block will contain.
+     */
+    public int getUnexpandedBlockSize() {
+        return k*packetSize;
+    }
+
+    /**
+     * @return The maximum number of bytes that a fully encoded block can 
+     * contain.
+     */
+    public int getExpandedBlockSize() {
+        return n*packetSize;
+    }
+
+    /**
+     * @return The number of blocks that this file will be partioned into.
+     */
+    public int getBlockCount() {
+        return numBlocks;
+    }
+
+    /** 
+     * @return The maximum number of stripes (N) that be created from this
+     * file.
+     */
+    public int getNumStripes() {
+        return n;
+    }
+
+    /**
+     * @return The number of packets required to send across the original 
+     * file.  Also the minimum number of packets required to recreate the
+     * original file.
+     */
+    public int getUnexpandedPacketCount() {
+       return Util.divideCeil(fileSize,packetSize);
+    }
+
+    /**
+     * @param blockNum The blockNum for which to count packets.
+     *
+     * @return The number of packets requried to send across the original
+     * block.
+     */
+    public int getUnexpandedPacketCount(int blockNum) {
+       return Util.divideCeil(getUnexpandedBlockSize(blockNum),packetSize);
+    }
+    
+    /**
+     * @param whichStripe The stripe for which we are counting packets.
+     *
+     * @return The number of packets in <code>whichStripe</code>.  Most of the
+     * time this will be equal to <code>numBlocks</code>, but if this stripe
+     * lands on a gap in the last block then it will contain <code>numBlocks-1
+     * </code>
+     */
+    public int getStripePacketCount(int whichStripe) {
+        if (whichStripe >= firstGapStripe && whichStripe < k) {
+            return numBlocks-1;
+        } else {
+            return numBlocks;
+        }
+    }
+    
+    /**
+     * @param whichBlock The block which the packet is in.
+     * @param whichStripe The stripe which the packet is in, or in other words
+     * the index of the packet within <code>whichBlock</code>
+     *
+     * @return The size of the packet.  Normally the packet size will be
+     * the same as <code>getPacketSize()</code>.  But if the packet is in
+     * the last block then it may be smaller.  If the packet is in the last
+     * block and is in a gap between the end of the file and K, then it's size
+     * will be 0.  Also if the packet is the last packet in the file (the 
+     * packet right before the gap), then the packetSize > 0 && < maxPacketSize
+     */
+    public int getPacketSize(int whichBlock, int whichStripe) {
+        if (whichBlock == numBlocks-1) {
+            if (whichStripe >= firstGapStripe && whichStripe < k) {
+                return 0;
+            }
+            if (whichStripe == firstGapStripe-1) {
+                return ((int)fileSize) % packetSize;
+            }
+        } 
+        return packetSize;
+    }
+
+    /**
+     * @param whichStripe The stripe for which we are finding the size of.
+     *
+     * @return The size of the stripe (in bytes).  Normally the stripe size
+     * will simply be <code>packetSize*numBlocks</code> But if the stripe falls
+     * on a gap it will be less.
+     */
+    public long getStripeSize(int whichStripe) {
+        if (getStripePacketCount(whichStripe) == numBlocks-1) {
+            return packetSize*(numBlocks-1);
+        } else if (whichStripe == firstGapStripe-1) {
+            if (fileSize % packetSize != 0) {
+                return (packetSize*(numBlocks-1)) + (fileSize % packetSize);
+            }
+        } 
+            
+        return packetSize*numBlocks;
+    }
+
+    /**
+     * @param whichBlock The block for which we are finding the size of.
+     * 
+     * @return The size of the unexpanded block.  Normally this will simply
+     * be <code>k*packetSize</code>.  But if this is the last block then
+     * it may be less.
+     */
+    public int getUnexpandedBlockSize(int whichBlock) {
+        if (whichBlock == numBlocks-1) {
+            if (numBlocks == 1) {
+                return (int) fileSize;
+            } else {
+                return ((int) fileSize) - (k*(numBlocks-1)*packetSize);
+            }
+        }
+        return k*packetSize;
+    }
+
+    /**
+     * Padding packets are empty packets that should never been read or
+     * written and shouldn't be sent across the network (they are all '0's)
+     *
+     * @param blockNum The blockNum of the packet to check
+     * @param stripeNum The stripeNum of the packet to check
+     *
+     * @return true if this packet is a padding packet.
+     */
+    public boolean isPaddingPacket(int blockNum, int stripeNum) {
+        return blockNum == numBlocks-1 && 
+            stripeNum >= getUnexpandedPacketCount(blockNum) &&
+            stripeNum < k;
+    }
+
+    public int hashCode() {
+        return k * n * (int) fileSize * packetSize;
+    }
+
+    public boolean equals(Object obj) {
+        if (obj instanceof FECParameters) {
+            FECParameters f2 = (FECParameters) obj;
+            if (f2.k == k && f2.n == n && f2.fileSize == fileSize && 
+                f2.packetSize == packetSize) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public String toString() {
+        return "FECParameters(k="+k+",n="+n+",packetSize="+packetSize+
+            ",fileSize="+fileSize+")";
+    }
+}

Added: 
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java
===================================================================
--- 
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java 
    2006-09-26 20:10:33 UTC (rev 10513)
+++ 
trunk/contrib/fec_src/com/onionnetworks/fec/io/FileAlreadyDecodedException.java 
    2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,23 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to write a packet to a
+ * file that has already been completely decoded. 
+ *  
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FileAlreadyDecodedException extends IOException {
+    
+    public FileAlreadyDecodedException() {
+        super();
+    }
+
+    public FileAlreadyDecodedException(String msg) {
+        super(msg);
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java        
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/FileDecodedEvent.java        
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,17 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that the file has been completely decoded.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class FileDecodedEvent extends FECIOEvent {
+
+    public FileDecodedEvent(Object source) {
+       super(source);
+    }
+}
+       

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java   
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketEntryAddedEvent.java   
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,27 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This event signifies that a new packet entry has been added.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketEntryAddedEvent extends FECIOEvent {
+
+    int packetIndex;
+
+    public PacketEntryAddedEvent(Object source, int packetIndex) {
+       super(source);
+        this.packetIndex = packetIndex;
+    }
+
+    /**
+     * @return the packetIndex of the packet just written
+     */
+    public int getPacketIndex() {
+        return packetIndex;
+    }
+}
+       

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java      
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketInfo.java      
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,41 @@
+package com.onionnetworks.fec.io;
+
+/**
+ * This class provides a (blockNum,stripeNum) tuple that is comparable for
+ * equality.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketInfo {
+
+    int blockNum, stripeNum;
+
+    public PacketInfo(int blockNum, int stripeNum) {
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+    }
+
+    public int getBlockNum() {
+        return blockNum;
+    }
+
+    public int getStripeNum() {
+        return stripeNum;
+    }
+
+    public int hashCode() {
+        return blockNum*stripeNum;
+    }
+
+    public boolean equals(Object obj) {
+        if (obj instanceof PacketInfo && 
+            ((PacketInfo) obj).blockNum == blockNum &&
+            ((PacketInfo) obj).stripeNum == stripeNum) {
+            return true;
+        }
+        return false;
+    }
+}

Added: 
trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java 
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketNotFoundException.java 
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,39 @@
+package com.onionnetworks.fec.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that there was an attempt to read a packet that
+ * can't be found, or created from the data on disk.
+ *  
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketNotFoundException extends IOException {
+    
+    int blockNum, stripeNum;
+
+    public PacketNotFoundException(int blockNum, int stripeNum) {
+        super();
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+    }
+
+    public PacketNotFoundException(String msg, int blockNum, int stripeNum) {
+        super(msg);
+
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+    }
+
+    public int getBlockNum() {
+        return blockNum;
+    }
+
+    public int getStripeNum() {
+        return stripeNum;
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java 
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketPlacement.java 
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,232 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+import org.apache.log4j.Category;
+
+/**
+ * This class allocates and tracks how packets are written to disk.  It
+ * is fully synchronized to safely support multi-threaded access.  Its
+ * data structures and operations are fairly well optimized.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketPlacement {
+
+    public static final short DECODED_BLOCK = -1;
+
+    static Category cat = Category.getInstance(PacketPlacement.class.
+                                              getName());
+    
+    FECParameters params;
+
+    int decodedBlockCount, totalPacketCount, extEntryCount;
+
+    // Tracks the number of packets in each block, if the block is decoded then
+    // it is set to DECODED_BLOCK.
+    short[] packetCount;
+
+    // List of all packets for each block in order written.
+    ArrayList[] entries;
+
+    // Maps block/stripe to packetIndexes.
+    HashMap[] revEntries;
+
+    int k, blockCount;
+
+
+    /**
+     * Creates a new PacketPlacement object.
+     *
+     * @param params The FECParameters for this download.
+     */
+    public PacketPlacement(FECParameters params) {
+        
+       this.params = params;
+        
+       k = params.getK();
+        blockCount = params.getBlockCount();
+        
+        packetCount = new short[blockCount];
+        entries = new ArrayList[blockCount];
+        revEntries = new HashMap[blockCount];
+    }
+
+    /**
+     * @return the number of blocks that have been decoded and written 
+     * back to disk.
+     */
+    public synchronized int getDecodedBlockCount() {
+        return decodedBlockCount;
+    }
+
+    /**
+     * Signify that this block has been decoded.
+     *
+     * @param blockNum The blockNum of the decoded block.
+     */
+    public synchronized void setBlockDecoded(int blockNum) {
+        if (packetCount[blockNum] == DECODED_BLOCK) {
+            throw new IllegalStateException
+                ("This block is already decoded:"+blockNum);
+        }
+        // This block has been decoded.
+        packetCount[blockNum] = DECODED_BLOCK;
+        decodedBlockCount++;
+        
+        // Update the forward and reverse entries
+        entries[blockNum] = null;
+        revEntries[blockNum] = null;
+    }
+
+    /**
+     * @param The blockNum of the block to check.
+     * @return true if the block has been decoded.
+     */
+    public synchronized boolean isBlockDecoded(int blockNum) {
+        return packetCount[blockNum] == DECODED_BLOCK;
+    }
+
+    /**
+     * @param blockNum the blockNum for which to count packets.
+     *
+     * @return The number of packets in the block.
+     * PacketPlacement.DECODED_BLOCK (-1) if the block is decoded.
+     */ 
+    public synchronized int getPacketCount(int blockNum) {
+        return packetCount[blockNum];
+    }
+
+    /**
+     * @return The total number of packets that have been written to disk. 
+     */ 
+    public synchronized int getWrittenCount() {
+        return totalPacketCount;
+    }
+
+    /**
+     * Perform a reverse lookup on the index to find the index of a specific
+     * packet.
+     * 
+     * @param blockNum The blockNum of the packet.
+     * @param stripeNum The stripeNum of the packet.
+     *
+     * @return the packetIndex if the packet is on disk, else -1
+     */
+    public synchronized int getPacketIndex(int blockNum, int stripeNum) {
+        // If its decoded then the vanilla packets are on disk.
+        if (isBlockDecoded(blockNum)) {
+            return (stripeNum >= 0 && stripeNum < k) ? 
+                blockNum*k+stripeNum : -1;
+        }
+
+        HashMap h = revEntries[blockNum];
+
+        // its not decoded, so a null h means that there are no packets.
+        if (h != null) {
+            Integer pi = (Integer) h.get(new Integer(stripeNum));
+            if (pi != null) {
+                return pi.intValue();
+            }  
+        }
+        return -1;
+    }
+
+    /**
+     * Add a new entry to an available slot.  BlockFulls will be thrown
+     * in preference to DuplicatePackets.
+     *
+     * @param blockNum The blockNum of the entry.
+     * @param stripeNum the stripeNum of the entry.
+     *
+     * @return the packetIndex which the data will be written to.
+     *
+     * @throws DuplicatePacketException When there is an attempt to write a 
+     *   packet a second time.
+     * @throws BlockFullException When the desired block is already full.
+     */
+    public synchronized int addPacketEntry(int blockNum, int stripeNum) {
+
+        // block already decoded
+        if (isBlockDecoded(blockNum)) {
+            throw new IllegalStateException
+                ("block already decoded, blockNum="+blockNum+",stripeNum="+
+                 stripeNum);
+        } 
+
+        // duplicate packet.
+        if (getPacketIndex(blockNum,stripeNum) != -1) {
+            throw new IllegalArgumentException
+                ("Duplicate entry for blockNum="+blockNum+",stripeNum="+
+                 stripeNum+",packetIndex="+getPacketIndex(blockNum,stripeNum));
+        }
+        
+        short blockPacketCount = (short) getPacketCount(blockNum);
+        int packetIndex = -1;
+
+       ArrayList blockEntries = entries[blockNum];
+  
+       // first packet for this block.
+       if (blockEntries == null) { 
+           blockEntries = new ArrayList();
+           entries[blockNum] = blockEntries;
+       }
+       
+       // Add the entry.
+       blockEntries.add(new Integer(stripeNum));
+
+        // check to see if this'll be an extended entry.
+        if (blockPacketCount < k) {
+           // regular entry in the block's area on disk.
+            packetIndex = blockNum*k+blockPacketCount; 
+        } else {
+            // extended entry at the end of the file.
+            packetIndex = blockCount*k+extEntryCount;
+            extEntryCount++;
+       }
+
+        HashMap revBlockEntries = revEntries[blockNum];
+        if (revBlockEntries == null) {
+            revBlockEntries = new HashMap();
+            revEntries[blockNum] = revBlockEntries;
+        }
+        revEntries[blockNum].put(new Integer(stripeNum),
+                                 new Integer(packetIndex));
+
+        packetCount[blockNum]++;
+        totalPacketCount++;        
+        
+        return packetIndex;
+    }
+    
+    /*
+     * Returns the list of stripes available for a specific blockNum.  These
+     * stripes will be in the order that they were written on disk.
+     * 
+     * @param blockNum The blockNum for which you want stripes.
+     * 
+     * @return an int[] containing the stripes found for that blockNum.
+     */
+    public synchronized int[] getStripeNums(int blockNum) {
+       return getStripeNums(blockNum,entries[blockNum].size());
+    }
+   
+    /*
+     * Returns the list of stripes available for a specific blockNum.  These
+     * stripes will be in the order that they were written on disk.
+     * 
+     * @param blockNum The blockNum for which you want stripes.
+     * @param count The number of stripes to return. 
+     * 
+     * @return an int[] containing the stripes found for that blockNum.
+     */
+    public synchronized int[] getStripeNums(int blockNum, int count) {
+       int[] result = new int[count];
+       for (int i=0;i<result.length;i++) {
+           result[i] = ((Integer) entries[blockNum].get(i)).intValue();
+       }
+       return result;
+    }
+}

Added: trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java
===================================================================
--- trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java      
2006-09-26 20:10:33 UTC (rev 10513)
+++ trunk/contrib/fec_src/com/onionnetworks/fec/io/PacketWrittenEvent.java      
2006-09-26 23:18:01 UTC (rev 10514)
@@ -0,0 +1,48 @@
+package com.onionnetworks.fec.io;
+
+import java.util.*;
+
+/**
+ * This event signifies that a new packet was recieved and written to disk.
+ *
+ * (c) Copyright 2001 Onion Networks
+ * (c) Copyright 2000 OpenCola
+ *
+ * @author Justin F. Chapweske (justin at chapweske.com)
+ */
+public class PacketWrittenEvent extends FECIOEvent {
+
+    int blockNum,stripeNum,blockPacketCount;
+
+    public PacketWrittenEvent(Object source, int blockNum, int stripeNum,
+                              int blockPacketCount) {
+       super(source);
+        this.blockNum = blockNum;
+        this.stripeNum = stripeNum;
+        this.blockPacketCount = blockPacketCount;
+    }
+
+    /**
+     * @return the blockNum of the packet just written.
+     */
+    public int getBlockNum() {
+        return blockNum;
+    }
+
+    /**
+     * @return the stripeNum of the packet just written.
+     */
+    public int getStripeNum() {
+        return stripeNum;
+    }
+
+    /**
+     * @return the blockPacketCount for this write.  This value is used to
+     * tell how many packets have been written to disk at this time.  Remember
+     * that more packets can be written to disk than need be for repair.
+     */
+    public int getBlockPacketCount() {
+        return blockPacketCount;
+    }
+}
+       


Reply via email to