Author: toad Date: 2008-03-01 21:17:08 +0000 (Sat, 01 Mar 2008) New Revision: 18303
Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java trunk/freenet/src/freenet/node/DarknetPeerNode.java trunk/freenet/src/freenet/node/OpennetManager.java trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java Log: Bulk* now supports a ByteCounter. Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java =================================================================== --- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-03-01 21:08:07 UTC (rev 18302) +++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-03-01 21:17:08 UTC (rev 18303) @@ -3,6 +3,7 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.io.xfer; +import freenet.io.comm.ByteCounter; import freenet.io.comm.DMT; import freenet.io.comm.DisconnectedException; import freenet.io.comm.Message; @@ -29,12 +30,14 @@ private boolean sentCancel; /** Not persistent over reboots */ final long peerBootID; + private final ByteCounter ctr; - public BulkReceiver(PartiallyReceivedBulk prb, PeerContext peer, long uid) { + public BulkReceiver(PartiallyReceivedBulk prb, PeerContext peer, long uid, ByteCounter ctr) { this.prb = prb; this.peer = peer; this.uid = uid; this.peerBootID = peer.getBootID(); + this.ctr = ctr; } public void onAborted() { @@ -43,7 +46,7 @@ sentCancel = true; } try { - peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid), null, 0, null); + peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid), null, 0, ctr); } catch (NotConnectedException e) { // Cool } @@ -59,7 +62,7 @@ MessageFilter mfPacket = MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend) .setField(DMT.UID, uid).setTimeout(TIMEOUT); if(prb.hasWholeFile()) { try { - peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, null); + peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, ctr); } catch (NotConnectedException e) { // Ignore, we have the data. } @@ -67,7 +70,7 @@ } Message m; try { - m = prb.usm.waitFor(mfSendKilled.or(mfPacket), null); + m = prb.usm.waitFor(mfSendKilled.or(mfPacket), ctr); } catch (DisconnectedException e) { prb.abort(RetrievalException.SENDER_DISCONNECTED, "Sender disconnected"); return false; Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java =================================================================== --- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-01 21:08:07 UTC (rev 18302) +++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-03-01 21:17:08 UTC (rev 18303) @@ -4,6 +4,7 @@ package freenet.io.xfer; import freenet.io.comm.AsyncMessageFilterCallback; +import freenet.io.comm.ByteCounter; import freenet.io.comm.DMT; import freenet.io.comm.DisconnectedException; import freenet.io.comm.Message; @@ -47,6 +48,7 @@ final boolean noWait; private long finishTime=-1; private String cancelReason; + private final ByteCounter ctr; /** * Create a bulk data transmitter. @@ -57,12 +59,13 @@ * @param noWait If true, don't wait for an FNPBulkReceivedAll, return as soon as we've sent everything. * @throws DisconnectedException If the peer we are trying to send to becomes disconnected. */ - public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, long uid, DoubleTokenBucket masterThrottle, boolean noWait) throws DisconnectedException { + public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, long uid, DoubleTokenBucket masterThrottle, boolean noWait, ByteCounter ctr) throws DisconnectedException { this.prb = prb; this.peer = peer; this.uid = uid; this.masterThrottle = masterThrottle; this.noWait = noWait; + this.ctr = ctr; peerBootID = peer.getBootID(); // Need to sync on prb while doing both operations, to avoid race condition. // Specifically, we must not get calls to blockReceived() until blocksNotSentButPresent @@ -153,7 +156,7 @@ sentCancel = true; } try { - peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 0, null); + peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 0, ctr); } catch (NotConnectedException e) { // Cool } @@ -245,7 +248,7 @@ // Congestion control and bandwidth limiting try { peer.getThrottle().sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), peer, - masterThrottle, prb.blockSize, null); + masterThrottle, prb.blockSize, ctr); synchronized(this) { blocksNotSentButPresent.setBit(blockNo, false); } Modified: trunk/freenet/src/freenet/node/DarknetPeerNode.java =================================================================== --- trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-03-01 21:08:07 UTC (rev 18302) +++ trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-03-01 21:17:08 UTC (rev 18303) @@ -842,7 +842,7 @@ throw new Error("Impossible: FileNotFoundException opening with RAF with rw! "+e, e); } prb = new PartiallyReceivedBulk(node.usm, size, Node.PACKET_SIZE, data, false); - receiver = new BulkReceiver(prb, DarknetPeerNode.this, uid); + receiver = new BulkReceiver(prb, DarknetPeerNode.this, uid, null); // FIXME make this persistent node.executor.execute(new Runnable() { public void run() { @@ -881,7 +881,7 @@ public void send() throws DisconnectedException { prb = new PartiallyReceivedBulk(node.usm, size, Node.PACKET_SIZE, data, true); - transmitter = new BulkTransmitter(prb, DarknetPeerNode.this, uid, node.outputThrottle, false); + transmitter = new BulkTransmitter(prb, DarknetPeerNode.this, uid, node.outputThrottle, false, null); if(logMINOR) Logger.minor(this, "Sending "+uid); node.executor.execute(new Runnable() { Modified: trunk/freenet/src/freenet/node/OpennetManager.java =================================================================== --- trunk/freenet/src/freenet/node/OpennetManager.java 2008-03-01 21:08:07 UTC (rev 18302) +++ trunk/freenet/src/freenet/node/OpennetManager.java 2008-03-01 21:17:08 UTC (rev 18303) @@ -508,7 +508,7 @@ new PartiallyReceivedBulk(node.usm, padded.length, Node.PACKET_SIZE, raf, true); try { BulkTransmitter bt = - new BulkTransmitter(prb, peer, xferUID, node.outputThrottle, true); + new BulkTransmitter(prb, peer, xferUID, node.outputThrottle, true, null); bt.send(); } catch (DisconnectedException e) { throw new NotConnectedException(e); @@ -611,7 +611,7 @@ byte[] buf = new byte[paddedLength]; ByteArrayRandomAccessThing raf = new ByteArrayRandomAccessThing(buf); PartiallyReceivedBulk prb = new PartiallyReceivedBulk(node.usm, buf.length, Node.PACKET_SIZE, raf, false); - BulkReceiver br = new BulkReceiver(prb, source, xferUID); + BulkReceiver br = new BulkReceiver(prb, source, xferUID, null); if(logMINOR) Logger.minor(this, "Receiving noderef (reply="+isReply+") as bulk transfer for request uid "+uid+" with transfer "+xferUID+" from "+source); if(!br.receive()) { Modified: trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java =================================================================== --- trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java 2008-03-01 21:08:07 UTC (rev 18302) +++ trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java 2008-03-01 21:17:08 UTC (rev 18303) @@ -494,7 +494,7 @@ final BulkTransmitter bt; try { - bt = new BulkTransmitter(prb, source, uid, updateManager.node.outputThrottle, false); + bt = new BulkTransmitter(prb, source, uid, updateManager.node.outputThrottle, false, null); } catch (DisconnectedException e) { Logger.error(this, "Peer "+source+" asked us for the blob file for the revocation key, then disconnected: "+e, e); return true; @@ -624,7 +624,7 @@ PartiallyReceivedBulk prb = new PartiallyReceivedBulk(updateManager.node.getUSM(), length, Node.PACKET_SIZE, raf, false); - final BulkReceiver br = new BulkReceiver(prb, source, uid); + final BulkReceiver br = new BulkReceiver(prb, source, uid, null); updateManager.node.executor.execute(new Runnable() { @@ -867,7 +867,7 @@ final BulkTransmitter bt; try { - bt = new BulkTransmitter(prb, source, uid, updateManager.node.outputThrottle, false); + bt = new BulkTransmitter(prb, source, uid, updateManager.node.outputThrottle, false, null); } catch (DisconnectedException e) { Logger.error(this, "Peer "+source+" asked us for the blob file for the main jar, then disconnected: "+e, e); return true; @@ -1006,7 +1006,7 @@ PartiallyReceivedBulk prb = new PartiallyReceivedBulk(updateManager.node.getUSM(), length, Node.PACKET_SIZE, raf, false); - final BulkReceiver br = new BulkReceiver(prb, source, uid); + final BulkReceiver br = new BulkReceiver(prb, source, uid, null); updateManager.node.executor.execute(new Runnable() {
