Author: toad
Date: 2007-05-30 19:25:57 +0000 (Wed, 30 May 2007)
New Revision: 13396
Modified:
trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
trunk/freenet/src/freenet/support/BitArray.java
Log:
More work on bulk transfer
Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-05-30 19:12:37 UTC
(rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-05-30 19:25:57 UTC
(rev 13396)
@@ -24,4 +24,12 @@
this.peer = peer;
this.uid = uid;
}
+
+ /**
+ * Called when the transfer fails because of a Throwable being thrown.
+ * @param t The throwable.
+ */
+ public void fail(Throwable t) {
+ // TODO Auto-generated method stub
+ }
}
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-30
19:12:37 UTC (rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-30
19:25:57 UTC (rev 13396)
@@ -4,6 +4,7 @@
package freenet.io.xfer;
import freenet.io.comm.PeerContext;
+import freenet.support.BitArray;
/**
* Bulk data transfer (not block). Bulk transfer is designed for files which
may be much bigger than a
@@ -20,10 +21,38 @@
final PeerContext peer;
/** Transfer UID for messages */
final long uid;
+ /** Blocks we have but haven't sent yet. 0 = block sent or not present,
1 = block present but not sent */
+ final BitArray blocksNotSentButPresent;
public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid) {
this.prb = prb;
this.peer = peer;
this.uid = uid;
+ // Need to sync on prb while doing both operations, to avoid
race condition.
+ // Specifically, we must not get calls to blockReceived() until
blocksNotSentButPresent
+ // has been set, AND it must be accurate, so there must not be
an unlocked period
+ // between cloning and adding.
+ synchronized(prb) {
+ // We can just clone it.
+ blocksNotSentButPresent = prb.cloneBlocksReceived();
+ prb.add(this);
+ }
}
+
+ /**
+ * Received a block. Set the relevant bit to 1 to indicate that we have
the block but haven't sent
+ * it yet. **Only called by PartiallyReceivedBulk.**
+ * @param block The block number that has been received.
+ */
+ synchronized void blockReceived(int block) {
+ blocksNotSentButPresent.setBit(block, true);
+ }
+
+ /**
+ * Called if the transfer fails because of a throwable.
+ * @param t The throwable causing the failure.
+ */
+ void fail(Throwable t) {
+ // TODO Auto-generated method stub
+ }
}
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-30 19:12:37 UTC (rev 13395)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-30 19:25:57 UTC (rev 13396)
@@ -4,6 +4,7 @@
package freenet.io.xfer;
import freenet.support.BitArray;
+import freenet.support.Logger;
import freenet.support.io.RandomAccessThing;
/**
@@ -19,10 +20,13 @@
final long size;
/** The size of the blocks sent as packets. */
final int blockSize;
- final RandomAccessThing raf;
+ private final RandomAccessThing raf;
/** Which blocks have been received and written? */
- final BitArray blocksReceived;
+ private final BitArray blocksReceived;
final int blocks;
+ private BulkTransmitter[] transmitters;
+ /** The one and only BulkReceiver */
+ private BulkReceiver recv;
/**
* Construct a PartiallyReceivedBulk.
@@ -44,4 +48,74 @@
if(initialState)
blocksReceived.setAllOnes();
}
+
+ /**
+ * Clone the blocksReceived BitArray. Used by BulkTransmitter to find
what blocks are available on
+ * creation. BulkTransmitter will have already taken the lock and will
keep it over the add() also.
+ * @return A copy of blocksReceived.
+ */
+ synchronized BitArray cloneBlocksReceived() {
+ return new BitArray(blocksReceived);
+ }
+
+ /**
+ * Add a BulkTransmitter to the list of BulkTransmitters. When a block
comes in, we will tell each
+ * BulkTransmitter about it.
+ * @param bt The BulkTransmitter to register.
+ */
+ synchronized void add(BulkTransmitter bt) {
+ if(transmitters == null)
+ transmitters = new BulkTransmitter[] { bt };
+ else {
+ BulkTransmitter[] t = new
BulkTransmitter[transmitters.length+1];
+ System.arraycopy(transmitters, 0, t, 0,
transmitters.length);
+ t[transmitters.length] = bt;
+ transmitters = t;
+ }
+ }
+
+ /**
+ * Called when a block has been received. Will copy the data from the
provided buffer and store it.
+ * @param blockNum The block number.
+ * @param data The byte array from which to read the data.
+ * @param offset The start of the
+ */
+ void received(int blockNum, byte[] data, int offset) {
+ BulkTransmitter[] notifyBTs;
+ synchronized(this) {
+ if(blocksReceived.bitAt(blockNum)) return; // ignore
+ blocksReceived.setBit(blockNum, true); // assume the
rest of the function succeeds
+ notifyBTs = transmitters;
+ }
+ try {
+ long fileOffset = (long)blockNum * (long)blockSize;
+ int bs = (int) Math.max(blockSize, size - fileOffset);
+ raf.pwrite(fileOffset, data, offset, bs);
+ } catch (Throwable t) {
+ fail(t);
+ }
+ if(notifyBTs == null) return;
+ for(int i=0;i<notifyBTs.length;i++) {
+ // Not a generic callback, so no catch{} guard
+ notifyBTs[i].blockReceived(blockNum);
+ }
+ }
+
+ /**
+ * Fail the transfer because of an unrecoverable exception e.g. an
error in storing the data.
+ * @param t The throwable causing this failure.
+ */
+ private void fail(Throwable t) {
+ Logger.error(this, "Failing transfer: "+this+" : "+t, t);
+ BulkTransmitter[] notifyBTs;
+ synchronized(this) {
+ notifyBTs = transmitters;
+ }
+ if(notifyBTs != null) {
+ for(int i=0;i<notifyBTs.length;i++)
+ notifyBTs[i].fail(t);
+ }
+ if(recv != null)
+ recv.fail(t);
+ }
}
Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java 2007-05-30 19:12:37 UTC
(rev 13395)
+++ trunk/freenet/src/freenet/support/BitArray.java 2007-05-30 19:25:57 UTC
(rev 13396)
@@ -41,6 +41,12 @@
_bits = new byte[(size / 8) + (size % 8 == 0 ? 0 : 1)];
}
+ public BitArray(BitArray src) {
+ this._size = src._size;
+ this._bits = new byte[src._size];
+ System.arraycopy(_bits, 0, src._bits, 0, _bits.length);
+ }
+
public void setBit(int pos, boolean f) {
int b = unsignedByteToInt(_bits[pos / 8]);
int mask = (1 << (pos % 8));