Author: toad
Date: 2008-12-13 20:18:41 +0000 (Sat, 13 Dec 2008)
New Revision: 24297
Modified:
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
Log:
Add a callback to sendThrottledPacket(), wait for all packets in flight to be
sent before failing with a timeout in a bulk send.
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-12-13 19:41:21 UTC
(rev 24296)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-12-13 20:18:41 UTC
(rev 24297)
@@ -38,7 +38,7 @@
/** Send a throttled message to the node (may block for a long time).
* @throws SyncSendWaitedTooLongException */
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean waitForSent) throws
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException;
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean waitForSent, AsyncMessageCallback
callback) throws NotConnectedException, WaitedTooLongException,
SyncSendWaitedTooLongException;
/** Get the current boot ID. This is a random number that changes every
time the node starts up. */
public long getBootID();
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-12-13
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-12-13
20:18:41 UTC (rev 24297)
@@ -98,7 +98,7 @@
}
int totalPackets;
try {
-
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT,
false);
+
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT,
false, null);
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
Logger.normal(this,
"Terminating send: "+e);
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-12-13
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-12-13
20:18:41 UTC (rev 24297)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.io.xfer;
+import freenet.io.comm.AsyncMessageCallback;
import freenet.io.comm.AsyncMessageFilterCallback;
import freenet.io.comm.ByteCounter;
import freenet.io.comm.DMT;
@@ -217,8 +218,24 @@
completed();
return true;
}
- // Wait for a packet, BulkReceivedAll or
BulkReceiveAborted
synchronized(this) {
+ // Wait for all packets to complete
+ while(true) {
+ if(failedPacket) {
+ cancel("Packet send
failed");
+ return false;
+ }
+
if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this,
"Waiting for packets: remaining: "+inFlightPackets);
+ if(inFlightPackets == 0) break;
+ try {
+ wait();
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ }
+
+ // Wait for a packet to come in,
BulkReceivedAll or BulkReceiveAborted
try {
wait(60*1000);
} catch (InterruptedException e) {
@@ -229,7 +246,7 @@
long end = System.currentTimeMillis();
if(end - lastSentPacket > TIMEOUT) {
Logger.error(this, "Send timed out on
"+this);
- cancel("Timeout");
+ cancel("Timeout awaiting
BulkReceivedAll");
return false;
}
continue;
@@ -245,7 +262,8 @@
// Congestion control and bandwidth limiting
try {
-
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
buf.length, ctr, BulkReceiver.TIMEOUT, false);
+ if(logMINOR) Logger.minor(this, "Sending packet
"+blockNo);
+
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf),
buf.length, ctr, BulkReceiver.TIMEOUT, false, new UnsentPacketTag());
synchronized(this) {
blocksNotSentButPresent.setBit(blockNo,
false);
}
@@ -266,6 +284,56 @@
}
}
+ private int inFlightPackets = 0;
+ private boolean failedPacket = false;
+
+ private class UnsentPacketTag implements AsyncMessageCallback {
+
+ private boolean finished;
+
+ private UnsentPacketTag() {
+ synchronized(BulkTransmitter.this) {
+ inFlightPackets++;
+ }
+ }
+
+ public void acknowledged() {
+ complete(false);
+ }
+
+ private void complete(boolean failed) {
+ synchronized(this) {
+ if(finished) return;
+ finished = true;
+ }
+ synchronized(BulkTransmitter.this) {
+ if(failed) {
+ failedPacket = true;
+ BulkTransmitter.this.notifyAll();
+ if(Logger.shouldLog(Logger.MINOR,
this)) Logger.minor(this, "Packet failed for "+BulkTransmitter.this);
+ } else {
+ inFlightPackets--;
+ if(inFlightPackets <= 0)
+
BulkTransmitter.this.notifyAll();
+ if(Logger.shouldLog(Logger.MINOR,
this)) Logger.minor(this, "Packet sent "+BulkTransmitter.this+" remaining in
flight: "+inFlightPackets);
+ }
+ }
+ }
+
+ public void disconnected() {
+ complete(true);
+ }
+
+ public void fatalError() {
+ complete(true);
+ }
+
+ public void sent() {
+ // Wait for acknowledgment
+ }
+
+ }
+
@Override
public String toString() {
return "BulkTransmitter:"+uid+":"+peer.shortToString();
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-12-13
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-12-13
20:18:41 UTC (rev 24297)
@@ -146,7 +146,7 @@
return ((PACKET_SIZE * 1000.0 / getDelay()));
}
- public void sendThrottledMessage(Message msg, PeerContext peer, int
packetSize, ByteCounter ctr, long deadline, boolean blockForSend) throws
NotConnectedException, ThrottleDeprecatedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
+ public void sendThrottledMessage(Message msg, PeerContext peer, int
packetSize, ByteCounter ctr, long deadline, boolean blockForSend,
AsyncMessageCallback cbForAsyncSend) throws NotConnectedException,
ThrottleDeprecatedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
long start = System.currentTimeMillis();
long bootID = peer.getBootID();
synchronized(this) {
@@ -226,7 +226,7 @@
Logger.error(this, "Congestion control wait time:
"+waitTime+" for "+this);
else if(logMINOR)
Logger.minor(this, "Congestion control wait time:
"+waitTime+" for "+this);
- MyCallback callback = new MyCallback();
+ MyCallback callback = new MyCallback(cbForAsyncSend);
try {
peer.sendAsync(msg, callback, ctr);
ctr.sentPayload(packetSize);
@@ -266,6 +266,12 @@
private boolean finished = false;
+ private AsyncMessageCallback chainCallback;
+
+ public MyCallback(AsyncMessageCallback cbForAsyncSend) {
+ this.chainCallback = cbForAsyncSend;
+ }
+
public void acknowledged() {
synchronized(PacketThrottle.this) {
if(finished) {
@@ -277,6 +283,7 @@
PacketThrottle.this.notifyAll();
}
if(logMINOR) Logger.minor(this, "Removed packet: acked
for "+this);
+ if(chainCallback != null) chainCallback.acknowledged();
}
public void disconnected() {
@@ -287,6 +294,7 @@
PacketThrottle.this.notifyAll();
}
if(logMINOR) Logger.minor(this, "Removed packet:
disconnected for "+this);
+ if(chainCallback != null) chainCallback.disconnected();
}
public void fatalError() {
@@ -297,6 +305,7 @@
PacketThrottle.this.notifyAll();
}
if(logMINOR) Logger.minor(this, "Removed packet: error
for "+this);
+ if(chainCallback != null) chainCallback.fatalError();
}
public void sent() {
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-12-13 19:41:21 UTC
(rev 24296)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-12-13 20:18:41 UTC
(rev 24297)
@@ -424,7 +424,7 @@
public void run() {
try {
-
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false);
+
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false,
null);
} catch (NotConnectedException e) {
// :(
} catch (WaitedTooLongException e) {
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-12-13 19:41:21 UTC
(rev 24296)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-12-13 20:18:41 UTC
(rev 24297)
@@ -4044,12 +4044,12 @@
return resendBytesSent;
}
- public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean blockForSend) throws
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException {
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr, int timeout, boolean blockForSend, AsyncMessageCallback
callback) throws NotConnectedException, WaitedTooLongException,
SyncSendWaitedTooLongException {
long deadline = System.currentTimeMillis() + timeout;
if(logMINOR) Logger.minor(this, "Sending throttled message with
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
for(int i=0;i<100;i++) {
try {
- getThrottle().sendThrottledMessage(msg, this,
packetSize, ctr, deadline, blockForSend);
+ getThrottle().sendThrottledMessage(msg, this,
packetSize, ctr, deadline, blockForSend, callback);
return;
} catch (ThrottleDeprecatedException e) {
// Try with the new throttle. We don't need it,
we'll get it from getThrottle().
@@ -4058,6 +4058,7 @@
}
Logger.error(this, "Peer constantly changes its IP address!!:
"+shortToString());
forceDisconnect(true);
+ throw new NotConnectedException();
}
/**
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-12-13 19:41:21 UTC
(rev 24296)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-12-13 20:18:41 UTC
(rev 24297)
@@ -349,7 +349,7 @@
public void run() {
try {
- source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60 * 1000, true);
+ source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60 * 1000, true, null);
applyByteCounts();
} catch(NotConnectedException e) {
// Okay
@@ -383,7 +383,7 @@
source.sendAsync(headersMsg, null, ctr);
final Message dataMsg = DMT.createFNPSSKDataFoundData(uid,
data);
try {
- source.sendThrottledMessage(dataMsg, data.length, ctr,
60 * 1000, false);
+ source.sendThrottledMessage(dataMsg, data.length, ctr,
60 * 1000, false, null);
} catch(SyncSendWaitedTooLongException e) {
// Impossible
throw new Error(e);
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-12-13 19:41:21 UTC
(rev 24296)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-12-13 20:18:41 UTC
(rev 24297)
@@ -249,7 +249,7 @@
try {
next.sendAsync(headersMsg, null, this);
- next.sendThrottledMessage(dataMsg, data.length,
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
+ next.sendThrottledMessage(dataMsg, data.length,
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false, null);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected
to "+next);
continue;
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs