Author: toad
Date: 2007-05-30 22:57:52 +0000 (Wed, 30 May 2007)
New Revision: 13405
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/support/BitArray.java
Log:
More work on bulk transmission.
Just need to code the receiver, and wire it in, and test it...
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2007-05-30 22:54:39 UTC (rev
13404)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2007-05-30 22:57:52 UTC (rev
13405)
@@ -184,6 +184,10 @@
BitArray.serializedLength(_packets) + 4 /* Message
header */;
}
+ public static int bulkPacketTransmitSize(int size) {
+ return size + 8 /* uid */ + 4 /* packet# */ + 4 /* Message
hader */;
+ }
+
public static final MessageType allSent = new MessageType("allSent") {{
addField(UID, Long.class);
}};
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2007-05-30 22:54:39 UTC
(rev 13404)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2007-05-30 22:57:52 UTC
(rev 13405)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.io.comm;
+import freenet.io.xfer.PacketThrottle;
+
/**
* @author amphibian
*
@@ -27,4 +29,11 @@
/** Send a message to the node */
public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
+
+ /** Get the current boot ID. This is a random number that changes every
time the node starts up. */
+ public long getBootID();
+
+ /** Get the PacketThrottle for the node's current address for the
standard packet size (if the
+ * address changes then we get a new throttle). */
+ public PacketThrottle getThrottle();
}
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2007-05-30
22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2007-05-30
22:57:52 UTC (rev 13405)
@@ -75,7 +75,7 @@
Logger.error(this, "Aborted during setup");
// Will throw on running
}
- throttle = PacketThrottle.getThrottle(_destination.getPeer(),
_prb._packetSize);
+ throttle = _destination.getThrottle();
_senderThread = new Thread("_senderThread for "+_uid+ " to
"+_destination.getPeer()) {
public void run() {
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-30
22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-05-30
22:57:52 UTC (rev 13405)
@@ -7,6 +7,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.support.BitArray;
+import freenet.support.DoubleTokenBucket;
/**
* Bulk data transfer (not block). Bulk transfer is designed for files which
may be much bigger than a
@@ -25,11 +26,19 @@
final long uid;
/** Blocks we have but haven't sent yet. 0 = block sent or not present,
1 = block present but not sent */
final BitArray blocksNotSentButPresent;
+ private boolean cancelled;
+ /** Not persistent over reboots */
+ final long peerBootID;
+ /** The overall hard bandwidth limiter */
+ final DoubleTokenBucket masterThrottle;
+
- public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid) {
+ public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, DoubleTokenBucket masterThrottle) {
this.prb = prb;
this.peer = peer;
this.uid = uid;
+ this.masterThrottle = masterThrottle;
+ peerBootID = peer.getBootID();
// Need to sync on prb while doing both operations, to avoid
race condition.
// Specifically, we must not get calls to blockReceived() until
blocksNotSentButPresent
// has been set, AND it must be accurate, so there must not be
an unlocked period
@@ -48,6 +57,7 @@
*/
synchronized void blockReceived(int block) {
blocksNotSentButPresent.setBit(block, true);
+ notifyAll();
}
/**
@@ -59,6 +69,88 @@
} catch (NotConnectedException e) {
// Cool
}
+ synchronized(this) {
+ notifyAll();
+ }
}
+ public void cancel() {
+ try {
+ peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null,
0, null);
+ } catch (NotConnectedException e) {
+ // Cool
+ }
+ synchronized(this) {
+ cancelled = true;
+ notifyAll();
+ }
+ prb.remove(this);
+ }
+
+ class Sender implements Runnable {
+
+ public void run() {
+ while(true) {
+ if(prb.isAborted()) return;
+ int blockNo;
+ if(peer.getBootID() != peerBootID) {
+ synchronized(this) {
+ cancelled = true;
+ notifyAll();
+ }
+ prb.remove(BulkTransmitter.this);
+ return;
+ }
+ boolean hasAll = prb.hasWholeFile();
+ synchronized(this) {
+ if(cancelled) return;
+ blockNo =
blocksNotSentButPresent.firstOne();
+ }
+ if(blockNo < 0 && hasAll) {
+ prb.remove(BulkTransmitter.this);
+ return; // All done
+ }
+ else if(blockNo < 0) {
+ synchronized(this) {
+ try {
+ wait(60*1000);
+ } catch (InterruptedException
e) {
+ // No problem
+ }
+ continue;
+ }
+ }
+ // Send a packet
+ byte[] buf = prb.getBlockData(blockNo);
+ if(buf == null) {
+ // Already cancelled, quit
+ return;
+ }
+
+ // Congestion control and bandwidth limiting
+ long now = System.currentTimeMillis();
+ long waitUntil =
peer.getThrottle().scheduleDelay(now);
+
+
masterThrottle.blockingGrab(prb.getPacketSize());
+
+ while((now = System.currentTimeMillis()) <
waitUntil) {
+ long sleepTime = waitUntil - now;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ // FIXME should this be reported on
bwlimitDelayTime ???
+
+ try {
+
peer.sendAsync(DMT.createFNPBulkPacketSend(uid, blockNo, buf), null, 0, null);
+ } catch (NotConnectedException e) {
+ cancel();
+ return;
+ }
+ }
+ }
+
+ }
}
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-30 22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-05-30 22:57:52 UTC (rev 13405)
@@ -3,7 +3,11 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.io.xfer;
+import java.io.IOException;
+
+import freenet.io.comm.DMT;
import freenet.io.comm.RetrievalException;
+import freenet.node.FNPPacketMangler;
import freenet.support.BitArray;
import freenet.support.Logger;
import freenet.support.io.RandomAccessThing;
@@ -28,6 +32,8 @@
private BulkTransmitter[] transmitters;
/** The one and only BulkReceiver */
private BulkReceiver recv;
+ private int blocksReceivedCount;
+ final int packetSize;
// Abort status
boolean _aborted;
int _abortReason;
@@ -50,8 +56,12 @@
throw new IllegalArgumentException("Too big");
this.blocks = (int)blocks;
blocksReceived = new BitArray(this.blocks);
- if(initialState)
+ if(initialState) {
blocksReceived.setAllOnes();
+ blocksReceivedCount = this.blocks;
+ }
+ packetSize = DMT.bulkPacketTransmitSize(blockSize) +
+ FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE; //
FIXME generalise
}
/**
@@ -90,6 +100,7 @@
synchronized(this) {
if(blocksReceived.bitAt(blockNum)) return; // ignore
blocksReceived.setBit(blockNum, true); // assume the
rest of the function succeeds
+ blocksReceivedCount++;
notifyBTs = transmitters;
}
try {
@@ -126,21 +137,40 @@
notifyBR.onAborted();
}
-// /**
-// * Fail the transfer because of an unrecoverable exception e.g. an
error in storing the data.
-// * @param t The throwable causing this failure.
-// */
-// private void fail(Throwable t) {
-// Logger.error(this, "Failing transfer: "+this+" : "+t, t);
-// BulkTransmitter[] notifyBTs;
-// synchronized(this) {
-// notifyBTs = transmitters;
-// }
-// if(notifyBTs != null) {
-// for(int i=0;i<notifyBTs.length;i++)
-// notifyBTs[i].fail(t);
-// }
-// if(recv != null)
-// recv.fail(t);
-// }
+ public synchronized boolean isAborted() {
+ return _aborted;
+ }
+
+ public int getPacketSize() {
+ return packetSize;
+ }
+
+ public boolean hasWholeFile() {
+ return blocksReceivedCount >= blocks;
+ }
+
+ public byte[] getBlockData(int blockNum) {
+ long fileOffset = (long)blockNum * (long)blockSize;
+ int bs = (int) Math.max(blockSize, size - fileOffset);
+ byte[] data = new byte[bs];
+ try {
+ raf.pread(fileOffset, data, 0, bs);
+ } catch (IOException e) {
+ Logger.error(this, "Failed to read stored block
"+blockNum+" on "+this+" : "+e, e);
+ abort(RetrievalException.IO_ERROR, e.toString());
+ return null;
+ }
+ return data;
+ }
+
+ public synchronized void remove(BulkTransmitter remove) {
+ BulkTransmitter[] newTrans = new
BulkTransmitter[transmitters.length-1];
+ int j = 0;
+ for(int i=0;i<transmitters.length;i++) {
+ BulkTransmitter t = transmitters[i];
+ if(t == remove) continue;
+ newTrans[j++] = t;
+ }
+ transmitters = newTrans;
+ }
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2007-05-30 22:54:39 UTC
(rev 13404)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2007-05-30 22:57:52 UTC
(rev 13405)
@@ -1572,6 +1572,10 @@
return true;
}
+ public long getBootID() {
+ return bootID;
+ }
+
private volatile Object arkFetcherSync = new Object();
void startARKFetcher() {
Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java 2007-05-30 22:54:39 UTC
(rev 13404)
+++ trunk/freenet/src/freenet/support/BitArray.java 2007-05-30 22:57:52 UTC
(rev 13405)
@@ -116,4 +116,17 @@
for(int i=0;i<_bits.length;i++)
_bits[i] = (byte)0xFF;
}
+
+ public int firstOne() {
+ for(int i=0;i<_bits.length;i++) {
+ byte b = _bits[i];
+ if(b == 0) continue;
+ for(int j=0;j<8;j++) {
+ int mask = (1 << j);
+ if((b & mask) != 0)
+ return i*8+j;
+ }
+ }
+ return -1;
+ }
}
\ No newline at end of file