Author: toad
Date: 2007-06-01 11:52:21 +0000 (Fri, 01 Jun 2007)
New Revision: 13436
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
Log:
FNPBulkReceivedAll, timeouts, bugfixes
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2007-06-01 11:34:51 UTC (rev
13435)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2007-06-01 11:52:21 UTC (rev
13436)
@@ -272,6 +272,16 @@
return msg;
}
+ public static final MessageType FNPBulkReceivedAll = new
MessageType("FNPBulkReceivedAll") {{
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPBulkReceivedAll(long uid) {
+ Message msg = new Message(FNPBulkReceivedAll);
+ msg.set(UID, uid);
+ return msg;
+ }
+
public static final MessageType testTransferSend = new
MessageType("testTransferSend") {{
addField(UID, Long.class);
}};
Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:34:51 UTC
(rev 13435)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:52:21 UTC
(rev 13436)
@@ -57,7 +57,14 @@
MessageFilter mfSendKilled =
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkSendAborted)
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
MessageFilter mfPacket =
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend)
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
while(true) {
- if(prb.hasWholeFile()) return true;
+ if(prb.hasWholeFile()) {
+ try {
+
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, null);
+ } catch (NotConnectedException e) {
+ // Ignore, we have the data.
+ }
+ return true;
+ }
Message m;
try {
m = prb.usm.waitFor(mfSendKilled.or(mfPacket),
null);
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-06-01
11:34:51 UTC (rev 13435)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-06-01
11:52:21 UTC (rev 13436)
@@ -12,6 +12,7 @@
import freenet.io.comm.PeerContext;
import freenet.support.BitArray;
import freenet.support.DoubleTokenBucket;
+import freenet.support.Logger;
/**
* Bulk data transfer (not block). Bulk transfer is designed for files which
may be much bigger than a
@@ -22,6 +23,8 @@
*/
public class BulkTransmitter {
+ /** If no packets sent in this period, and no completion
acknowledgement / cancellation, assume failure. */
+ static final int TIMEOUT = 5*60*1000;
/** Available blocks */
final PartiallyReceivedBulk prb;
/** Peer who we are sending the data to */
@@ -67,6 +70,19 @@
return false;
}
});
+
prb.usm.addAsyncFilter(MessageFilter.create().setNoTimeout().setSource(peer).setType(DMT.FNPBulkReceivedAll).setField(DMT.UID,
uid),
+ new AsyncMessageFilterCallback() {
+ public void onMatched(Message
m) {
+ completed();
+ }
+ public boolean shouldTimeout() {
+
synchronized(BulkTransmitter.this) {
+ if(cancelled ||
finished) return true;
+ }
+
if(BulkTransmitter.this.prb.isAborted()) return true;
+ return false;
+ }
+ });
} catch (DisconnectedException e) {
cancel();
throw e;
@@ -113,12 +129,24 @@
}
prb.remove(this);
}
+
+ /** Like cancel(), but without the negative overtones: The client says
it's got everything,
+ * we believe them (even if we haven't sent everything; maybe they had
a partial). */
+ public void completed() {
+ synchronized(this) {
+ finished = true;
+ notifyAll();
+ }
+ prb.remove(this);
+ }
/**
* Send the file.
* @return True if the file was successfully sent. False otherwise.
*/
public boolean send() {
+ int packetSize = prb.getPacketSize();
+ long lastSentPacket = System.currentTimeMillis();
while(true) {
if(prb.isAborted()) return false;
int blockNo;
@@ -130,23 +158,27 @@
prb.remove(BulkTransmitter.this);
return false;
}
- boolean hasAll = prb.hasWholeFile();
synchronized(this) {
+ if(finished) return true;
if(cancelled) return false;
blockNo = blocksNotSentButPresent.firstOne();
}
- if(blockNo < 0 && hasAll) {
- prb.remove(BulkTransmitter.this);
- return true; // All done
- } else if(blockNo < 0) {
+ if(blockNo < 0) {
+ // Wait for a packet, BulkReceivedAll or
BulkReceiveAborted
synchronized(this) {
try {
wait(60*1000);
} catch (InterruptedException e) {
// No problem
+ continue;
}
- continue;
}
+ long end = System.currentTimeMillis();
+ if(end - lastSentPacket > TIMEOUT) {
+ Logger.error(this, "Send timed out on
"+this);
+ cancel();
+ return false;
+ }
}
// Send a packet
byte[] buf = prb.getBlockData(blockNo);
@@ -159,12 +191,22 @@
long now = System.currentTimeMillis();
long waitUntil = peer.getThrottle().scheduleDelay(now);
- masterThrottle.blockingGrab(prb.getPacketSize());
+ masterThrottle.blockingGrab(packetSize);
while((now = System.currentTimeMillis()) < waitUntil) {
long sleepTime = waitUntil - now;
try {
- Thread.sleep(sleepTime);
+ synchronized(this) {
+ wait(sleepTime);
+ if(finished) {
+
masterThrottle.recycle(packetSize);
+ return true;
+ }
+ if(cancelled) {
+
masterThrottle.recycle(packetSize);
+ return false;
+ }
+ }
} catch (InterruptedException e) {
// Ignore
}
@@ -173,6 +215,10 @@
try {
peer.sendAsync(DMT.createFNPBulkPacketSend(uid,
blockNo, buf), null, 0, null);
+ synchronized(this) {
+ blocksNotSentButPresent.setBit(blockNo,
false);
+ }
+ lastSentPacket = System.currentTimeMillis();
} catch (NotConnectedException e) {
cancel();
return false;