Author: toad
Date: 2007-06-01 11:34:51 +0000 (Fri, 01 Jun 2007)
New Revision: 13435
Modified:
trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
Log:
Completed receiving code.
Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:15:19 UTC
(rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:34:51 UTC
(rev 13435)
@@ -4,8 +4,13 @@
package freenet.io.xfer;
import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
+import freenet.io.comm.RetrievalException;
+import freenet.support.ShortBuffer;
/**
* Bulk (not block) data transfer - receiver class. Bulk transfer is designed
for largish files, much
@@ -13,7 +18,8 @@
* @author toad
*/
public class BulkReceiver {
-
+
+ static final int TIMEOUT = 5*60*1000;
/** Tracks the data we have received */
final PartiallyReceivedBulk prb;
/** Peer we are receiving from */
@@ -21,11 +27,14 @@
/** Transfer UID for messages */
final long uid;
private boolean sentCancel;
+ /** Not persistent over reboots */
+ final long peerBootID;
public BulkReceiver(PartiallyReceivedBulk prb, PeerContext peer, long
uid) {
this.prb = prb;
this.peer = peer;
this.uid = uid;
+ this.peerBootID = peer.getBootID();
}
public void onAborted() {
@@ -40,4 +49,39 @@
}
}
+ /**
+ * Receive the file.
+ * @return True if the whole file was received, false otherwise.
+ */
+ public boolean receive() {
+ MessageFilter mfSendKilled =
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkSendAborted)
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
+ MessageFilter mfPacket =
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend)
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
+ while(true) {
+ if(prb.hasWholeFile()) return true;
+ Message m;
+ try {
+ m = prb.usm.waitFor(mfSendKilled.or(mfPacket),
null);
+ } catch (DisconnectedException e) {
+
prb.abort(RetrievalException.SENDER_DISCONNECTED, "Sender disconnected");
+ return false;
+ }
+ if(peer.getBootID() != peerBootID) {
+ prb.abort(RetrievalException.SENDER_DIED,
"Sender restarted");
+ return false;
+ }
+ if(m == null) {
+ prb.abort(RetrievalException.TIMED_OUT, "Sender
timeout");
+ return false;
+ }
+ if(m.getSpec() == DMT.FNPBulkSendAborted) {
+ prb.abort(RetrievalException.SENDER_DIED,
"Sender cancelled send");
+ return false;
+ }
+ if(m.getSpec() == DMT.FNPBulkPacketSend) {
+ int packetNo = m.getInt(DMT.PACKET_NO);
+ byte[] data = ((ShortBuffer)
m.getObject(DMT.DATA)).getData();
+ prb.received(packetNo, data, 0, data.length);
+ }
+ }
+ }
}
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-06-01 11:15:19 UTC (rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-06-01 11:34:51 UTC (rev 13435)
@@ -96,10 +96,18 @@
* Called when a block has been received. Will copy the data from the
provided buffer and store it.
* @param blockNum The block number.
* @param data The byte array from which to read the data.
- * @param offset The start of the
+ * @param offset The start of the data in the buffer.
*/
- void received(int blockNum, byte[] data, int offset) {
+ void received(int blockNum, byte[] data, int offset, int length) {
BulkTransmitter[] notifyBTs;
+ long fileOffset = (long)blockNum * (long)blockSize;
+ int bs = (int) Math.max(blockSize, size - fileOffset);
+ if(length < bs) {
+ String err = "Data too short! Should be "+bs+" actually
"+length;
+ Logger.error(this, err+" for "+this);
+ abort(RetrievalException.PREMATURE_EOF, err);
+ return;
+ }
synchronized(this) {
if(blocksReceived.bitAt(blockNum)) return; // ignore
blocksReceived.setBit(blockNum, true); // assume the
rest of the function succeeds
@@ -107,8 +115,6 @@
notifyBTs = transmitters;
}
try {
- long fileOffset = (long)blockNum * (long)blockSize;
- int bs = (int) Math.max(blockSize, size - fileOffset);
raf.pwrite(fileOffset, data, offset, bs);
} catch (Throwable t) {
Logger.error(this, "Failed to store received block
"+blockNum+" on "+this+" : "+t, t);