Author: toad
Date: 2007-05-30 19:25:57 +0000 (Wed, 30 May 2007)
New Revision: 13396

Modified:
   trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
   trunk/freenet/src/freenet/support/BitArray.java
Log:
More work on bulk transfer

Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-05-30 19:12:37 UTC 
(rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-05-30 19:25:57 UTC 
(rev 13396)
@@ -24,4 +24,12 @@
                this.peer = peer;
                this.uid = uid;
        }
+
+       /**
+        * Called when the transfer fails because of a Throwable being thrown.
+        * @param t The throwable.
+        */
+       public void fail(Throwable t) {
+               // TODO Auto-generated method stub
+       }
 }

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-30 
19:12:37 UTC (rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-30 
19:25:57 UTC (rev 13396)
@@ -4,6 +4,7 @@
 package freenet.io.xfer;

 import freenet.io.comm.PeerContext;
+import freenet.support.BitArray;

 /**
  * Bulk data transfer (not block). Bulk transfer is designed for files which 
may be much bigger than a 
@@ -20,10 +21,38 @@
        final PeerContext peer;
        /** Transfer UID for messages */
        final long uid;
+       /** Blocks we have but haven't sent yet. 0 = block sent or not present, 
1 = block present but not sent */
+       final BitArray blocksNotSentButPresent;

        public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid) {
                this.prb = prb;
                this.peer = peer;
                this.uid = uid;
+               // Need to sync on prb while doing both operations, to avoid 
race condition.
+               // Specifically, we must not get calls to blockReceived() until 
blocksNotSentButPresent
+               // has been set, AND it must be accurate, so there must not be 
an unlocked period
+               // between cloning and adding.
+               synchronized(prb) {
+                       // We can just clone it.
+                       blocksNotSentButPresent = prb.cloneBlocksReceived();
+                       prb.add(this);
+               }
        }
+
+       /**
+        * Received a block. Set the relevant bit to 1 to indicate that we have 
the block but haven't sent
+        * it yet. **Only called by PartiallyReceivedBulk.**
+        * @param block The block number that has been received.
+        */
+       synchronized void blockReceived(int block) {
+               blocksNotSentButPresent.setBit(block, true);
+       }
+
+       /**
+        * Called if the transfer fails because of a throwable.
+        * @param t The throwable causing the failure.
+        */
+       void fail(Throwable t) {
+               // TODO Auto-generated method stub
+       }
 }

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-30 19:12:37 UTC (rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-30 19:25:57 UTC (rev 13396)
@@ -4,6 +4,7 @@
 package freenet.io.xfer;

 import freenet.support.BitArray;
+import freenet.support.Logger;
 import freenet.support.io.RandomAccessThing;

 /**
@@ -19,10 +20,13 @@
        final long size;
        /** The size of the blocks sent as packets. */
        final int blockSize;
-       final RandomAccessThing raf;
+       private final RandomAccessThing raf;
        /** Which blocks have been received and written? */
-       final BitArray blocksReceived;
+       private final BitArray blocksReceived;
        final int blocks;
+       private BulkTransmitter[] transmitters;
+       /** The one and only BulkReceiver */
+       private BulkReceiver recv;

        /**
         * Construct a PartiallyReceivedBulk.
@@ -44,4 +48,74 @@
                if(initialState)
                        blocksReceived.setAllOnes();
        }
+
+       /**
+        * Clone the blocksReceived BitArray. Used by BulkTransmitter to find 
what blocks are available on 
+        * creation. BulkTransmitter will have already taken the lock and will 
keep it over the add() also.
+        * @return A copy of blocksReceived.
+        */
+       synchronized BitArray cloneBlocksReceived() {
+               return new BitArray(blocksReceived);
+       }
+       
+       /**
+        * Add a BulkTransmitter to the list of BulkTransmitters. When a block 
comes in, we will tell each
+        * BulkTransmitter about it.
+        * @param bt The BulkTransmitter to register.
+        */
+       synchronized void add(BulkTransmitter bt) {
+               if(transmitters == null)
+                       transmitters = new BulkTransmitter[] { bt };
+               else {
+                       BulkTransmitter[] t = new 
BulkTransmitter[transmitters.length+1];
+                       System.arraycopy(transmitters, 0, t, 0, 
transmitters.length);
+                       t[transmitters.length] = bt;
+                       transmitters = t;
+               }
+       }
+       
+       /**
+        * Called when a block has been received. Will copy the data from the 
provided buffer and store it.
+        * @param blockNum The block number.
+        * @param data The byte array from which to read the data.
+        * @param offset The start of the 
+        */
+       void received(int blockNum, byte[] data, int offset) {
+               BulkTransmitter[] notifyBTs;
+               synchronized(this) {
+                       if(blocksReceived.bitAt(blockNum)) return; // ignore
+                       blocksReceived.setBit(blockNum, true); // assume the 
rest of the function succeeds
+                       notifyBTs = transmitters;
+               }
+               try {
+                       long fileOffset = (long)blockNum * (long)blockSize;
+                       int bs = (int) Math.max(blockSize, size - fileOffset);
+                       raf.pwrite(fileOffset, data, offset, bs);
+               } catch (Throwable t) {
+                       fail(t);
+               }
+               if(notifyBTs == null) return;
+               for(int i=0;i<notifyBTs.length;i++) {
+                       // Not a generic callback, so no catch{} guard
+                       notifyBTs[i].blockReceived(blockNum);
+               }
+       }
+
+       /**
+        * Fail the transfer because of an unrecoverable exception e.g. an 
error in storing the data.
+        * @param t The throwable causing this failure.
+        */
+       private void fail(Throwable t) {
+               Logger.error(this, "Failing transfer: "+this+" : "+t, t);
+               BulkTransmitter[] notifyBTs;
+               synchronized(this) {
+                       notifyBTs = transmitters;
+               }
+               if(notifyBTs != null) {
+                       for(int i=0;i<notifyBTs.length;i++)
+                               notifyBTs[i].fail(t);
+               }
+               if(recv != null)
+                       recv.fail(t);
+       }
 }

Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java     2007-05-30 19:12:37 UTC 
(rev 13395)
+++ trunk/freenet/src/freenet/support/BitArray.java     2007-05-30 19:25:57 UTC 
(rev 13396)
@@ -41,6 +41,12 @@
                _bits = new byte[(size / 8) + (size % 8 == 0 ? 0 : 1)];
        }

+       public BitArray(BitArray src) {
+               this._size = src._size;
+               this._bits = new byte[src._size];
+               System.arraycopy(_bits, 0, src._bits, 0, _bits.length);
+       }
+       
        public void setBit(int pos, boolean f) {
                int b = unsignedByteToInt(_bits[pos / 8]);
                int mask = (1 << (pos % 8));


Reply via email to