Author: toad
Date: 2008-03-04 14:00:22 +0000 (Tue, 04 Mar 2008)
New Revision: 18332
Modified:
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/DarknetPeerNode.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/OpennetManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
Log:
PeerNode.sendThrottledPacket(). Make it, use it, drop throttles from
Block/BulkTransmitters.
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -34,6 +34,9 @@
/** Send a message to the node */
public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
+ /** Send a throttled message to the node (may block for a long time). */
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr) throws NotConnectedException;
+
/** Get the current boot ID. This is a random number that changes every
time the node starts up. */
public long getBootID();
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-03-04
13:53:37 UTC (rev 18331)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-03-04
14:00:22 UTC (rev 18332)
@@ -59,19 +59,17 @@
private BitArray _sentPackets;
final PacketThrottle throttle;
private long timeAllSent = -1;
- final DoubleTokenBucket _masterThrottle;
final ByteCounter _ctr;
final int PACKET_SIZE;
private boolean asyncExitStatus;
private boolean asyncExitStatusSet;
- public BlockTransmitter(MessageCore usm, PeerContext destination, long
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
+ public BlockTransmitter(MessageCore usm, PeerContext destination, long
uid, PartiallyReceivedBlock source, ByteCounter ctr) {
_usm = usm;
_destination = destination;
_uid = uid;
_prb = source;
_ctr = ctr;
- _masterThrottle = masterThrottle;
PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize,
_prb._packets)
+
destination.getOutgoingMangler().fullHeadersLengthOneMessage();
try {
@@ -100,8 +98,7 @@
}
int totalPackets;
try {
-
throttle.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)),
- _destination,
_masterThrottle, _prb._packetSize, _ctr);
+
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr);
if(_ctr != null)
_ctr.sentPayload(_prb._packetSize);
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-04
13:53:37 UTC (rev 18331)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-04
14:00:22 UTC (rev 18332)
@@ -39,8 +39,6 @@
private boolean cancelled;
/** Not persistent over reboots */
final long peerBootID;
- /** The overall hard bandwidth limiter */
- final DoubleTokenBucket masterThrottle;
private boolean sentCancel;
private boolean finished;
final int packetSize;
@@ -59,11 +57,10 @@
* @param noWait If true, don't wait for an FNPBulkReceivedAll, return
as soon as we've sent everything.
* @throws DisconnectedException If the peer we are trying to send to
becomes disconnected.
*/
- public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, DoubleTokenBucket masterThrottle, boolean noWait, ByteCounter ctr)
throws DisconnectedException {
+ public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, boolean noWait, ByteCounter ctr) throws DisconnectedException {
this.prb = prb;
this.peer = peer;
this.uid = uid;
- this.masterThrottle = masterThrottle;
this.noWait = noWait;
this.ctr = ctr;
peerBootID = peer.getBootID();
@@ -247,8 +244,7 @@
// Congestion control and bandwidth limiting
try {
-
peer.getThrottle().sendThrottledMessage(DMT.createFNPBulkPacketSend(uid,
blockNo, buf), peer,
- masterThrottle, prb.blockSize,
ctr);
+
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, prb.blockSize, buf),
packetSize, ctr);
if(ctr != null) ctr.sentPayload(prb.blockSize);
synchronized(this) {
blocksNotSentButPresent.setBit(blockNo,
false);
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -46,7 +46,7 @@
BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
this.pn = pn;
- bt = new BlockTransmitter(node.usm, pn, uid, prb,
node.outputThrottle, CHKInsertSender.this);
+ bt = new BlockTransmitter(node.usm, pn, uid, prb,
CHKInsertSender.this);
}
void start() {
Modified: trunk/freenet/src/freenet/node/DarknetPeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -881,7 +881,7 @@
public void send() throws DisconnectedException {
prb = new PartiallyReceivedBulk(node.usm, size,
Node.PACKET_SIZE, data, true);
- transmitter = new BulkTransmitter(prb,
DarknetPeerNode.this, uid, node.outputThrottle, false, null);
+ transmitter = new BulkTransmitter(prb,
DarknetPeerNode.this, uid, false, null);
if(logMINOR)
Logger.minor(this, "Sending "+uid);
node.executor.execute(new Runnable() {
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -396,7 +396,7 @@
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
final BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, senderCounter);
+ new BlockTransmitter(node.usm, source, uid, prb,
senderCounter);
node.executor.execute(new PrioRunnable() {
public int getPriority() {
Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/OpennetManager.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -508,7 +508,7 @@
new PartiallyReceivedBulk(node.usm, padded.length,
Node.PACKET_SIZE, raf, true);
try {
BulkTransmitter bt =
- new BulkTransmitter(prb, peer, xferUID,
node.outputThrottle, true, ctr);
+ new BulkTransmitter(prb, peer, xferUID, true,
ctr);
bt.send();
} catch (DisconnectedException e) {
throw new NotConnectedException(e);
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -57,6 +57,7 @@
import freenet.keys.Key;
import freenet.keys.USK;
import freenet.support.Base64;
+import freenet.support.DoubleTokenBucket;
import freenet.support.Fields;
import freenet.support.HexUtil;
import freenet.support.IllegalBase64Exception;
@@ -3639,4 +3640,7 @@
return resendBytesSent;
}
+ public void sendThrottledMessage(Message msg, int packetSize,
ByteCounter ctr) throws NotConnectedException {
+ getThrottle().sendThrottledMessage(msg, this,
node.outputThrottle, packetSize, ctr);
+ }
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-03-04 13:53:37 UTC
(rev 18331)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-03-04 14:00:22 UTC
(rev 18332)
@@ -175,7 +175,7 @@
PartiallyReceivedBlock prb = rs.getPRB();
bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
+ new BlockTransmitter(node.usm, source, uid, prb, this);
node.addTransferringRequestHandler(uid);
bt.sendAsync(node.executor);
} catch (NotConnectedException e) {
@@ -349,7 +349,7 @@
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
+ new BlockTransmitter(node.usm, source, uid, prb, this);
node.addTransferringRequestHandler(uid);
source.sendAsync(df, null, 0, this);
if(bt.send(node.executor)) {
Modified: trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
2008-03-04 13:53:37 UTC (rev 18331)
+++ trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
2008-03-04 14:00:22 UTC (rev 18332)
@@ -494,7 +494,7 @@
final BulkTransmitter bt;
try {
- bt = new BulkTransmitter(prb, source, uid,
updateManager.node.outputThrottle, false, updateManager.ctr);
+ bt = new BulkTransmitter(prb, source, uid, false,
updateManager.ctr);
} catch (DisconnectedException e) {
Logger.error(this, "Peer "+source+" asked us for the
blob file for the revocation key, then disconnected: "+e, e);
return true;
@@ -867,7 +867,7 @@
final BulkTransmitter bt;
try {
- bt = new BulkTransmitter(prb, source, uid,
updateManager.node.outputThrottle, false, updateManager.ctr);
+ bt = new BulkTransmitter(prb, source, uid, false,
updateManager.ctr);
} catch (DisconnectedException e) {
Logger.error(this, "Peer "+source+" asked us for the
blob file for the main jar, then disconnected: "+e, e);
return true;