Author: toad
Date: 2008-12-02 13:53:48 +0000 (Tue, 02 Dec 2008)
New Revision: 24012
Modified:
trunk/freenet/src/freenet/node/BlockedTooLongException.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/ResendPacketItem.java
trunk/freenet/src/freenet/node/StillNotAckedException.java
Log:
REFACTOR: KeyTracker -> PacketTracker + KeyTracker.
KeyTracker is now simply a session key, PacketTracker is all the retransmission
and acking logic.
One KeyTracker has a fixed PacketTracker. However a PacketTracker may (soon) be
used by multiple KeyTracker's.
Modified: trunk/freenet/src/freenet/node/BlockedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/node/BlockedTooLongException.java 2008-12-02
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/BlockedTooLongException.java 2008-12-02
13:53:48 UTC (rev 24012)
@@ -2,10 +2,10 @@
public class BlockedTooLongException extends Exception {
- public final KeyTracker tracker;
+ public final PacketTracker tracker;
public final long delta;
- public BlockedTooLongException(KeyTracker tracker, long delta) {
+ public BlockedTooLongException(PacketTracker tracker, long delta) {
this.tracker = tracker;
this.delta = delta;
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-12-02
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-12-02
13:53:48 UTC (rev 24012)
@@ -1773,7 +1773,9 @@
(seqBuf[2] & 0xff)) << 8) +
(seqBuf[3] & 0xff);
- int targetSeqNumber =
tracker.highestReceivedIncomingSeqNumber();
+ PacketTracker packets = tracker.packets;
+
+ int targetSeqNumber =
packets.highestReceivedIncomingSeqNumber();
if(logMINOR) Logger.minor(this, "Seqno: "+seqNumber+" (highest
seen "+targetSeqNumber+") receiving packet from "+tracker.pn.getPeer());
if(seqNumber == -1) {
@@ -1924,7 +1926,8 @@
acks[i] = referenceSeqNumber - offset;
}
- tracker.acknowledgedPackets(acks);
+ PacketTracker packets = tracker.packets;
+ packets.acknowledgedPackets(acks);
int retransmitCount = decrypted[ptr++] & 0xff;
if(logMINOR) Logger.minor(this, "Retransmit requests:
"+retransmitCount);
@@ -1936,7 +1939,7 @@
}
int realSeqNo = referenceSeqNumber - offset;
if(logMINOR) Logger.minor(this, "RetransmitRequest:
"+realSeqNo);
- tracker.resendPacket(realSeqNo);
+ packets.resendPacket(realSeqNo);
}
int ackRequestsCount = decrypted[ptr++] & 0xff;
@@ -1951,7 +1954,7 @@
}
int realSeqNo = realSeqNumber - offset;
if(logMINOR) Logger.minor(this, "AckRequest:
"+realSeqNo);
- tracker.receivedAckRequest(realSeqNo);
+ packets.receivedAckRequest(realSeqNo);
}
int forgottenCount = decrypted[ptr++] & 0xff;
@@ -1963,7 +1966,7 @@
Logger.error(this, "Packet not long enough at
byte "+ptr+" on "+tracker);
}
int realSeqNo = realSeqNumber - offset;
- tracker.destForgotPacket(realSeqNo);
+ packets.destForgotPacket(realSeqNo);
}
tracker.pn.receivedPacket(false, true); // Must keep the
connection open, even if it's an ack packet only and on an incompatible
connection - we may want to do a UOM transfer e.g.
@@ -1971,13 +1974,13 @@
// No sequence number == no messages
- if((seqNumber != -1) && tracker.alreadyReceived(seqNumber)) {
- tracker.queueAck(seqNumber); // Must keep the
connection open!
+ if((seqNumber != -1) && packets.alreadyReceived(seqNumber)) {
+ packets.queueAck(seqNumber); // Must keep the
connection open!
if(logMINOR) Logger.minor(this, "Received packet twice
("+seqNumber+") from "+tracker.pn.getPeer()+": "+seqNumber+"
("+TimeUtil.formatTime((long) tracker.pn.averagePingTime(), 2, true)+" ping
avg)");
return;
}
- tracker.receivedPacket(seqNumber);
+ packets.receivedPacket(seqNumber);
if(seqNumber == -1) {
if(logMINOR) Logger.minor(this, "Returning because
seqno = "+seqNumber);
@@ -2031,7 +2034,8 @@
}
return false;
}
- if(kt.wouldBlock(false)) {
+ PacketTracker packets = kt.packets;
+ if(packets.wouldBlock(false)) {
if(logMINOR) Logger.minor(this, "Would block: "+kt);
// Requeue
if(!dontRequeue) {
@@ -2040,7 +2044,7 @@
return false;
}
int length = 1;
- length += kt.countAcks() + kt.countAckRequests() +
kt.countResendRequests();
+ length += packets.countAcks() + packets.countAckRequests() +
packets.countResendRequests();
int callbacksCount = 0;
int x = 0;
String mi_name = null;
@@ -2052,7 +2056,7 @@
if(mi.formatted) {
try {
byte[] buf = mi.getData();
- int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
+ int packetNumber =
packets.allocateOutgoingPacketNumberNeverBlock();
int size =
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb,
mi.getPriority());
//MARK: onSent()
mi.onSent(size);
@@ -2165,7 +2169,7 @@
requeueLogString = ", requeueing remaining
messages";
}
length = 1;
- length += kt.countAcks() + kt.countAckRequests() +
kt.countResendRequests();
+ length += packets.countAcks() +
packets.countAckRequests() + packets.countResendRequests();
int count = 0;
int lastIndex = 0;
if(logMINOR)
@@ -2282,8 +2286,9 @@
Logger.normal(this, "Dropping packet:
Not connected to "+peer.getPeer()+" yet(2)");
throw new NotConnectedException();
}
- int seqNo = neverWaitForPacketNumber ?
tracker.allocateOutgoingPacketNumberNeverBlock() :
- tracker.allocateOutgoingPacketNumber();
+ PacketTracker packets = tracker.packets;
+ int seqNo = neverWaitForPacketNumber ?
packets.allocateOutgoingPacketNumberNeverBlock() :
+ packets.allocateOutgoingPacketNumber();
return processOutgoingPreformatted(buf, offset,
length, tracker, seqNo, callbacks, priority);
} catch (KeyChangedException e) {
Logger.normal(this, "Key changed(2) for
"+peer.getPeer());
@@ -2331,6 +2336,7 @@
int[] acks, resendRequests, ackRequests, forgotPackets;
int seqNumber;
+ PacketTracker packets = tracker.packets;
/* Locking:
* Avoid allocating a packet number, then a long pause due to
* overload, during which many other packets are sent,
@@ -2350,7 +2356,7 @@
// Ack/resendreq only packet
seqNumber = -1;
else
- seqNumber =
tracker.allocateOutgoingPacketNumberNeverBlock();
+ seqNumber =
packets.allocateOutgoingPacketNumberNeverBlock();
}
if(logMINOR) Logger.minor(this, "Sequence number (sending):
"+seqNumber+" ("+packetNumber+") to "+tracker.pn.getPeer());
@@ -2363,12 +2369,12 @@
try {
synchronized(tracker) {
- acks = tracker.grabAcks();
- forgotPackets = tracker.grabForgotten();
- resendRequests = tracker.grabResendRequests();
- ackRequests = tracker.grabAckRequests();
- realSeqNumber = tracker.getLastOutgoingSeqNumber();
- otherSideSeqNumber =
tracker.highestReceivedIncomingSeqNumber();
+ acks = packets.grabAcks();
+ forgotPackets = packets.grabForgotten();
+ resendRequests = packets.grabResendRequests();
+ ackRequests = packets.grabAckRequests();
+ realSeqNumber = packets.getLastOutgoingSeqNumber();
+ otherSideSeqNumber =
packets.highestReceivedIncomingSeqNumber();
if(logMINOR) Logger.minor(this, "Sending packet to
"+tracker.pn.getPeer()+", other side max seqno: "+otherSideSeqNumber);
}
} catch (StillNotAckedException e) {
@@ -2516,7 +2522,7 @@
if(logMINOR) Logger.minor(this, "Forgot packet
"+i+": "+seq);
int offsetSeq = realSeqNumber - seq;
if((offsetSeq > 255) || (offsetSeq < 0)) {
- if(tracker.isDeprecated()) {
+ if(packets.isDeprecated()) {
// Oh well
Logger.error(this, "Dropping
forgot-packet notification on deprecated tracker: "+seq+" on "+tracker+" - real
seq="+realSeqNumber);
// Ignore it
@@ -2531,7 +2537,7 @@
forgotOffsets[i] = (byte) offsetSeq;
forgotCount++;
if(forgotCount == 256)
-
tracker.requeueForgot(forgotPackets, forgotCount, forgotPackets.length -
forgotCount);
+
packets.requeueForgot(forgotPackets, forgotCount, forgotPackets.length -
forgotCount);
}
}
}
@@ -2561,7 +2567,7 @@
if(seqNumber != -1) {
byte[] saveable = new byte[length];
System.arraycopy(buf, offset, saveable, 0, length);
- tracker.sentPacket(saveable, seqNumber, callbacks,
priority);
+ packets.sentPacket(saveable, seqNumber, callbacks,
priority);
}
if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);
@@ -2782,8 +2788,8 @@
return !((PeerNode)context).isConnected();
}
- public void resend(ResendPacketItem item) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException {
- int size = processOutgoingPreformatted(item.buf, 0,
item.buf.length, item.kt, item.packetNumber, item.callbacks, item.priority);
+ public void resend(ResendPacketItem item, KeyTracker tracker) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException {
+ int size = processOutgoingPreformatted(item.buf, 0,
item.buf.length, tracker, item.packetNumber, item.callbacks, item.priority);
item.pn.resendByteCounter.sentBytes(size);
}
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2008-12-02 13:48:21 UTC
(rev 24011)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2008-12-02 13:53:48 UTC
(rev 24012)
@@ -3,1133 +3,35 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Vector;
-
import freenet.crypt.BlockCipher;
-import freenet.io.comm.AsyncMessageCallback;
-import freenet.io.comm.DMT;
-import freenet.io.comm.NotConnectedException;
-import freenet.io.xfer.PacketThrottle;
-import freenet.support.DoublyLinkedList;
-import freenet.support.IndexableUpdatableSortedLinkedListItem;
-import freenet.support.LimitedRangeIntByteArrayMap;
-import freenet.support.LimitedRangeIntByteArrayMapElement;
-import freenet.support.Logger;
-import freenet.support.ReceivedPacketNumbers;
-import freenet.support.TimeUtil;
-import freenet.support.UpdatableSortedLinkedListItem;
-import freenet.support.UpdatableSortedLinkedListKilledException;
-import freenet.support.UpdatableSortedLinkedListWithForeignIndex;
-import freenet.support.WouldBlockException;
-import freenet.support.DoublyLinkedList.Item;
/**
- * @author amphibian
- *
- * Class to track everything related to a single key on a single
- * PeerNode. In particular, the key itself, packets sent and
- * received, and packet numbers.
+ * Class representing a single session key.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
*/
public class KeyTracker {
-
+
+ /** A PacketTracker may have more than one KeyTracker, but a KeyTracker
+ * may only have one PacketTracker. In other words, in some cases it is
+ * possible to change the session key without invalidating the packet
+ * sequence, but it is never possible to invalidate the packet sequence
+ * without changing the session key. */
+ final PacketTracker packets;
+
private static boolean logMINOR;
/** Parent PeerNode */
public final PeerNode pn;
- /** Are we the secondary key? */
- private volatile boolean isDeprecated;
/** Cipher to both encrypt outgoing packets with and decrypt
* incoming ones. */
public final BlockCipher sessionCipher;
/** Key for above cipher, so far for debugging */
public final byte[] sessionKey;
- /** Packets we have sent to the node, minus those that have
- * been acknowledged. */
- private final LimitedRangeIntByteArrayMap sentPacketsContents;
- /** Serial numbers of packets that we want to acknowledge,
- * and when they become urgent. We always add to the end,
- * and we always remove from the beginning, so should always
- * be consistent. */
- private final List<QueuedAck> ackQueue;
- /** Serial numbers of packets that we have forgotten. Usually
- * when we have forgotten a packet it just means that it has
- * been shifted to another KeyTracker because this one was
- * deprecated; the messages will get through in the end.
- */
- private final List<QueuedForgotten> forgottenQueue;
- /** The highest incoming serial number we have ever seen
- * from the other side. Includes actual packets and resend
- * requests (provided they are within range). */
- private int highestSeenIncomingSerialNumber;
- /** Serial numbers of packets we want to be resent by the
- * other side to us, the time at which they become sendable,
- * and the time at which they become urgent. In order of
- * the latter. */
- private final UpdatableSortedLinkedListWithForeignIndex
resendRequestQueue;
- /** Serial numbers of packets we want to be acknowledged by
- * the other side, the time at which they become sendable,
- * and the time at which they become urgent. In order of
- * the latter. */
- private final UpdatableSortedLinkedListWithForeignIndex ackRequestQueue;
- /** Numbered packets that we need to send to the other side
- * because they asked for them. Just contains the numbers. */
- private final HashSet<Integer> packetsToResend;
- /** Ranges of packet numbers we have received from the other
- * side. */
- private final ReceivedPacketNumbers packetNumbersReceived;
- /** Counter to generate the next packet number */
- private int nextPacketNumber;
- final long createdTime;
- /** The time at which we last successfully decoded a packet. */
- private long timeLastDecodedPacket;
- /** Everything is clear to start with */
- KeyTracker(PeerNode pn, BlockCipher cipher, byte[] sessionKey) {
- this.pn = pn;
+ KeyTracker(PeerNode parent, PacketTracker tracker, BlockCipher cipher,
byte[] sessionKey) {
+ this.pn = parent;
+ this.packets = tracker;
this.sessionCipher = cipher;
this.sessionKey = sessionKey;
- ackQueue = new LinkedList<QueuedAck>();
- forgottenQueue = new LinkedList<QueuedForgotten>();
- highestSeenIncomingSerialNumber = -1;
- // give some leeway
- sentPacketsContents = new LimitedRangeIntByteArrayMap(128);
- resendRequestQueue = new
UpdatableSortedLinkedListWithForeignIndex();
- ackRequestQueue = new
UpdatableSortedLinkedListWithForeignIndex();
- packetsToResend = new HashSet<Integer>();
- packetNumbersReceived = new ReceivedPacketNumbers(512);
- isDeprecated = false;
- nextPacketNumber = pn.node.random.nextInt(100 * 1000);
- createdTime = System.currentTimeMillis();
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
- /**
- * Set the deprecated flag to indicate that we are now
- * no longer the primary key. And wake up any threads trying to lock
- * a packet number; they can be sent with the new KT.
- *
- * After this, new packets will not be sent. It will not be possible to
allocate a new
- * packet number. However, old resend requests etc may still be sent.
- */
- public void deprecated() {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- isDeprecated = true;
- sentPacketsContents.interrupt();
- }
-
- /**
- * @return The highest received incoming serial number.
- */
- public int highestReceivedIncomingSeqNumber() {
- return this.highestSeenIncomingSerialNumber;
- }
-
- /**
- * Received this packet??
- */
- public boolean alreadyReceived(int seqNumber) {
- return packetNumbersReceived.contains(seqNumber);
- }
-
- /** toString() - don't leak the key unless asked to */
- @Override
- public String toString() {
- return super.toString() + " for " + pn.shortToString();
- }
-
- /**
- * Queue an ack to be sent back to the node soon.
- * @param seqNumber The number of the packet to be acked.
- */
- public void queueAck(int seqNumber) {
- if(logMINOR)
- Logger.minor(this, "Queueing ack for " + seqNumber);
- QueuedAck qa = new QueuedAck(seqNumber);
- synchronized(ackQueue) {
- ackQueue.add(qa);
- }
- // Will go urgent in 200ms
- }
-
- public void queueForgotten(int seqNumber) {
- queueForgotten(seqNumber, true);
- }
-
- public void queueForgotten(int seqNumber, boolean log) {
- if(log && ((!isDeprecated) || logMINOR)) {
- String msg = "Queueing forgotten for " + seqNumber + "
for " + this;
- if(!isDeprecated)
- Logger.error(this, msg);
- else
- Logger.minor(this, msg);
- }
- QueuedForgotten qf = new QueuedForgotten(seqNumber);
- synchronized(forgottenQueue) {
- forgottenQueue.add(qf);
- }
- }
-
- static class PacketActionItem { // anyone got a better name?
-
- /** Packet sequence number */
- int packetNumber;
- /** Time at which this packet's ack or resend request becomes
urgent
- * and can trigger an otherwise empty packet to be sent. */
- long urgentTime;
-
- @Override
- public String toString() {
- return super.toString() + ": packet " + packetNumber +
" urgent@" + urgentTime + '(' + (System.currentTimeMillis() - urgentTime) + ')';
- }
- }
-
- private final static class QueuedAck extends PacketActionItem {
-
- QueuedAck(int packet) {
- long now = System.currentTimeMillis();
- packetNumber = packet;
- /** If not included on a packet in next 200ms, then
- * force a send of an otherwise empty packet.
- */
- urgentTime = now + 200;
- }
- }
-
- // FIXME this is almost identical to QueuedAck, coalesce the classes
- private final static class QueuedForgotten extends PacketActionItem {
-
- QueuedForgotten(int packet) {
- long now = System.currentTimeMillis();
- packetNumber = packet;
- /** If not included on a packet in next 500ms, then
- * force a send of an otherwise empty packet.
- */
- urgentTime = now + 500;
- }
- }
-
- private abstract class BaseQueuedResend extends PacketActionItem
- implements IndexableUpdatableSortedLinkedListItem {
-
- /** Time at which this item becomes sendable.
- * When we send a resend request, this is reset to t+500ms.
- *
- * Constraint: urgentTime is always greater than activeTime.
- */
- long activeTime;
-
- void sent() throws UpdatableSortedLinkedListKilledException {
- long now = System.currentTimeMillis();
- activeTime = now + 500;
- urgentTime = activeTime + urgentDelay();
- // This is only removed when we actually receive the packet
- // But for now it will sleep
- }
-
- BaseQueuedResend(int packetNumber) {
- this.packetNumber = packetNumber;
- long now = System.currentTimeMillis();
- activeTime = initialActiveTime(now);
- urgentTime = activeTime + urgentDelay();
- }
-
- abstract long urgentDelay();
-
- abstract long initialActiveTime(long now);
- private Item next;
- private Item prev;
-
- public final Item getNext() {
- return next;
- }
-
- public final Item setNext(Item i) {
- Item old = next;
- next = i;
- return old;
- }
-
- public Item getPrev() {
- return prev;
- }
-
- public Item setPrev(Item i) {
- Item old = prev;
- prev = i;
- return old;
- }
-
- public int compareTo(Object o) {
- BaseQueuedResend r = (BaseQueuedResend) o;
- if(urgentTime > r.urgentTime)
- return 1;
- if(urgentTime < r.urgentTime)
- return -1;
- if(packetNumber > r.packetNumber)
- return 1;
- if(packetNumber < r.packetNumber)
- return -1;
- return 0;
- }
-
- public Object indexValue() {
- return packetNumber;
- }
- private DoublyLinkedList parent;
-
- public DoublyLinkedList getParent() {
- return parent;
- }
-
- public DoublyLinkedList setParent(DoublyLinkedList l) {
- DoublyLinkedList old = parent;
- parent = l;
- return old;
- }
- }
-
- private class QueuedResendRequest extends BaseQueuedResend {
-
- @Override
- long initialActiveTime( long now) {
- return now; // Active immediately; reordering is rare
- }
-
- QueuedResendRequest(int packetNumber) {
- super(packetNumber);
- }
-
- @Override
- void sent() throws UpdatableSortedLinkedListKilledException {
- synchronized(resendRequestQueue) {
- super.sent();
- resendRequestQueue.update(this);
- }
- }
-
- @Override
- long urgentDelay() {
- return PacketSender.MAX_COALESCING_DELAY; // Urgent
pretty soon
- }
- }
-
- private class QueuedAckRequest extends BaseQueuedResend {
-
- final long createdTime;
- long activeDelay;
-
- @Override
- long initialActiveTime( long now) {
- // Request an ack after four RTTs
- activeDelay = twoRTTs();
- return now + activeDelay;
- }
-
- QueuedAckRequest(int packetNumber) {
- super(packetNumber);
- this.createdTime = System.currentTimeMillis();
- }
-
- @Override
- void sent() throws UpdatableSortedLinkedListKilledException {
- synchronized(ackRequestQueue) {
- super.sent();
- ackRequestQueue.update(this);
- }
- }
-
- /**
- * Acknowledged.
- */
- public void onAcked() {
- long t = Math.max(0, System.currentTimeMillis() -
createdTime);
- pn.reportPing(t);
- if(logMINOR)
- Logger.minor(this, "Reported round-trip time of
" + TimeUtil.formatTime(t, 2, true) + " on " + pn.getPeer() + " (avg " +
TimeUtil.formatTime((long) pn.averagePingTime(), 2, true) + ", #" +
packetNumber + ')');
- }
-
- @Override
- long urgentDelay() {
- return PacketSender.MAX_COALESCING_DELAY;
- }
- }
-
- /**
- * Called when we receive a packet.
- * @param seqNumber The packet's serial number.
- * See the comments in FNPPacketMangler.processOutgoing* for
- * the reason for the locking.
- */
- public synchronized void receivedPacket(int seqNumber) {
- timeLastDecodedPacket = System.currentTimeMillis();
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(logMINOR)
- Logger.minor(this, "Received packet " + seqNumber + "
from " + pn.shortToString());
- if(seqNumber == -1)
- return;
- // FIXME delete this log statement
- if(logMINOR)
- Logger.minor(this, "Still received packet: " +
seqNumber);
- // Received packet
- receivedPacketNumber(seqNumber);
- // Ack it even if it is a resend
- queueAck(seqNumber);
- }
-
- // TCP uses four RTTs with no ack to resend ... but we have a more
drawn out protocol, we
- // should use only two.
- public long twoRTTs() {
- // FIXME upper bound necessary ?
- return (long) Math.min(Math.max(250, pn.averagePingTime() * 2),
2500);
- }
-
- protected void receivedPacketNumber(int seqNumber) {
- if(logMINOR)
- Logger.minor(this, "Handling received packet number " +
seqNumber);
- queueResendRequests(seqNumber);
- packetNumbersReceived.got(seqNumber);
- try {
- removeResendRequest(seqNumber);
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore, not our problem
- }
- synchronized(this) {
- highestSeenIncomingSerialNumber =
Math.max(highestSeenIncomingSerialNumber, seqNumber);
- }
- if(logMINOR)
- Logger.minor(this, "Handled received packet number " +
seqNumber);
- }
-
- /**
- * Remove a resend request from the queue.
- * @param seqNumber
- * @throws UpdatableSortedLinkedListKilledException
- */
- private void removeResendRequest(int seqNumber) throws
UpdatableSortedLinkedListKilledException {
- synchronized(resendRequestQueue) {
- resendRequestQueue.removeByKey(seqNumber);
- }
- }
-
- /**
- * Add some resend requests if necessary.
- * @param seqNumber The number of the packet we just received.
- */
- private void queueResendRequests(int seqNumber) {
- int max;
- synchronized(this) {
- max = packetNumbersReceived.highest();
- }
- if(seqNumber > max)
- try {
- if((max != -1) && (seqNumber - max > 1)) {
- if(logMINOR)
- Logger.minor(this, "Queueing
resends from " + max + " to " + seqNumber);
- // Missed some packets out
- for(int i = max + 1; i < seqNumber;
i++) {
- queueResendRequest(i);
- }
- }
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore (we are decoding packet, not sending
one)
- }
- }
-
- /**
- * Queue a resend request
- * @param packetNumber the packet serial number to queue a
- * resend request for
- * @throws UpdatableSortedLinkedListKilledException
- */
- private void queueResendRequest(int packetNumber) throws
UpdatableSortedLinkedListKilledException {
- synchronized(resendRequestQueue) {
- if(queuedResendRequest(packetNumber)) {
- if(logMINOR)
- Logger.minor(this, "Not queueing resend
request for " + packetNumber + " - already queued");
- return;
- }
- if(logMINOR)
- Logger.minor(this, "Queueing resend request for
" + packetNumber);
- QueuedResendRequest qrr = new
QueuedResendRequest(packetNumber);
- resendRequestQueue.add(qrr);
- }
- }
-
- /**
- * Queue an ack request
- * @param packetNumber the packet serial number to queue a
- * resend request for
- * @throws UpdatableSortedLinkedListKilledException
- */
- private void queueAckRequest(int packetNumber) throws
UpdatableSortedLinkedListKilledException {
- synchronized(ackRequestQueue) {
- // FIXME should we just remove the existing ack
request? If we do, we get a better
- // estimate of RTT on lossy links... if we don't, lossy
links will include the average
- // time to send a packet including all resends. The
latter may be useful, and in fact
- // the former is unreliable...
- if(queuedAckRequest(packetNumber)) {
- if(logMINOR)
- Logger.minor(this, "Not queueing ack
request for " + packetNumber + " - already queued");
- return;
- }
- if(logMINOR)
- Logger.minor(this, "Queueing ack request for "
+ packetNumber + " on " + this);
- QueuedAckRequest qrr = new
QueuedAckRequest(packetNumber);
- ackRequestQueue.add(qrr);
- }
- }
-
- /**
- * Is an ack request queued for this packet number?
- */
- private boolean queuedAckRequest(int packetNumber) {
- synchronized(ackRequestQueue) {
- return ackRequestQueue.containsKey(packetNumber);
- }
- }
-
- /**
- * Is a resend request queued for this packet number?
- */
- private boolean queuedResendRequest(int packetNumber) {
- synchronized(resendRequestQueue) {
- return resendRequestQueue.containsKey(packetNumber);
- }
- }
-
- /**
- * Called when we have received several packet acknowledgements.
- * Synchronized for the same reason as the sender code is:
- * So that we don't end up sending packets too late when overloaded,
- * and get horrible problems such as asking to resend packets which
- * haven't been sent yet.
- */
- public synchronized void acknowledgedPackets(int[] seqNos) {
- AsyncMessageCallback[][] callbacks = new
AsyncMessageCallback[seqNos.length][];
- for(int i = 0; i < seqNos.length; i++) {
- int realSeqNo = seqNos[i];
- if(logMINOR)
- Logger.minor(this, "Acknowledged packet: " +
realSeqNo);
- try {
- removeAckRequest(realSeqNo);
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore, we are processing an incoming packet
- }
- if(logMINOR)
- Logger.minor(this, "Removed ack request");
- callbacks[i] =
sentPacketsContents.getCallbacks(realSeqNo);
- byte[] buf = sentPacketsContents.get(realSeqNo);
- long timeAdded = sentPacketsContents.getTime(realSeqNo);
- if(sentPacketsContents.remove(realSeqNo))
- if(buf.length > Node.PACKET_SIZE) {
- PacketThrottle throttle =
pn.getThrottle();
- throttle.notifyOfPacketAcknowledged();
-
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
- }
- }
- int cbCount = 0;
- for(int i = 0; i < callbacks.length; i++) {
- AsyncMessageCallback[] cbs = callbacks[i];
- if(cbs != null)
- for(int j = 0; j < cbs.length; j++) {
- cbs[j].acknowledged();
- cbCount++;
- }
- }
- if(cbCount > 0 && logMINOR)
- Logger.minor(this, "Executed " + cbCount + "
callbacks");
- try {
- wouldBlock(true);
- } catch(BlockedTooLongException e) {
- // Ignore, will come up again. In any case it's rather
unlikely...
- }
- }
-
- /**
- * Called when we have received a packet acknowledgement.
- * @param realSeqNo
- */
- public void acknowledgedPacket(int realSeqNo) {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- AsyncMessageCallback[] callbacks;
- if(logMINOR)
- Logger.minor(this, "Acknowledged packet: " + realSeqNo);
- try {
- synchronized(this) {
- removeAckRequest(realSeqNo);
- }
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore, we are processing an incoming packet
- }
- if(logMINOR)
- Logger.minor(this, "Removed ack request");
- callbacks = sentPacketsContents.getCallbacks(realSeqNo);
- byte[] buf = sentPacketsContents.get(realSeqNo);
- long timeAdded = sentPacketsContents.getTime(realSeqNo);
- if(sentPacketsContents.remove(realSeqNo))
- if(buf.length > Node.PACKET_SIZE) {
- PacketThrottle throttle = pn.getThrottle();
- throttle.notifyOfPacketAcknowledged();
-
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
- }
- try {
- wouldBlock(true);
- } catch(BlockedTooLongException e) {
- // Ignore, will come up again. In any case it's rather
unlikely...
- }
- if(callbacks != null) {
- for(int i = 0; i < callbacks.length; i++)
- callbacks[i].acknowledged();
- if(logMINOR)
- Logger.minor(this, "Executed " +
callbacks.length + " callbacks");
- }
- }
-
- /**
- * Remove an ack request from the queue by packet number.
- * @throws UpdatableSortedLinkedListKilledException
- */
- private void removeAckRequest(int seqNo) throws
UpdatableSortedLinkedListKilledException {
- QueuedAckRequest qr = null;
-
- synchronized(ackRequestQueue) {
- qr = (QueuedAckRequest)
ackRequestQueue.removeByKey(seqNo);
- }
- if(qr != null)
- qr.onAcked();
- else
- Logger.normal(this, "Removing ack request twice? Null
on " + seqNo + " from " + pn.getPeer() + " (" + TimeUtil.formatTime((int)
pn.averagePingTime(), 2, true) + " ping avg)");
- }
-
- /**
- * Resend (off-thread but ASAP) a packet.
- * @param seqNumber The serial number of the packet to be
- * resent.
- */
- public void resendPacket(int seqNumber) {
- byte[] resendData = sentPacketsContents.get(seqNumber);
- if(resendData != null) {
- if(resendData.length > Node.PACKET_SIZE)
- pn.getThrottle().notifyOfPacketLost();
- synchronized(packetsToResend) {
- packetsToResend.add(seqNumber);
- }
- pn.node.ps.wakeUp();
- } else {
- String msg = "Asking me to resend packet " + seqNumber +
- " which we haven't sent yet or which they have
already acked (next=" + nextPacketNumber + ')';
- // Might just be late, but could indicate something
serious.
- if(isDeprecated) {
- if(logMINOR)
- Logger.minor(this, "Other side wants us
to resend packet " + seqNumber + " for " + this + " - we cannot do this because
we are deprecated");
- } else
- Logger.normal(this, msg);
- }
- }
-
- /**
- * Called when we receive an AckRequest.
- * @param packetNumber The packet that the other side wants
- * us to re-ack.
- */
- public synchronized void receivedAckRequest(int packetNumber) {
- if(queuedAck(packetNumber)) {
- // Already going to send an ack
- // Don't speed it up though; wasteful
- } else if(packetNumbersReceived.contains(packetNumber))
- // We have received it, so send them an ack
- queueAck(packetNumber);
- else {
- // We have not received it, so get them to resend it
- try {
- queueResendRequest(packetNumber);
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore, we are decoding, not sending.
- }
- highestSeenIncomingSerialNumber =
Math.max(highestSeenIncomingSerialNumber, packetNumber);
- }
- }
-
- /**
- * Is there a queued ack with the given packet number?
- * FIXME: have a hashtable? The others do, but probably it
- * isn't necessary. We should be consistent about this -
- * either take it out of UpdatableSortedLinkedListWithForeignIndex,
- * or add one here.
- */
- private boolean queuedAck(int packetNumber) {
- synchronized(ackQueue) {
- for(QueuedAck qa : ackQueue) {
- if(qa.packetNumber == packetNumber)
- return true;
- }
- }
- return false;
- }
-
- /**
- * Destination forgot a packet.
- * This is normal if we are the secondary key.
- * @param seqNumber The packet number lost.
- */
- public void destForgotPacket(int seqNumber) {
- if(isDeprecated)
- Logger.normal(this, "Destination forgot packet: " +
seqNumber);
- else
- Logger.error(this, "Destination forgot packet: " +
seqNumber);
- synchronized(this) {
- try {
- removeResendRequest(seqNumber);
- } catch(UpdatableSortedLinkedListKilledException e) {
- // Ignore
- }
- }
- }
-
- /**
- * @return A packet number for a new outgoing packet.
- * This method will block until one is available if
- * necessary.
- * @throws KeyChangedException if the thread is interrupted when waiting
- */
- public int allocateOutgoingPacketNumber() throws KeyChangedException,
NotConnectedException {
- int packetNumber;
- if(!pn.isConnected())
- throw new NotConnectedException();
- synchronized(this) {
- if(isDeprecated)
- throw new KeyChangedException();
- packetNumber = nextPacketNumber++;
- if(logMINOR)
- Logger.minor(this, "Allocated " + packetNumber
+ " in allocateOutgoingPacketNumber for " + this);
- }
- while(true) {
- try {
- sentPacketsContents.lock(packetNumber);
- if(logMINOR)
- Logger.minor(this, "Locked " +
packetNumber);
- synchronized(this) {
- timeWouldBlock = -1;
- }
- return packetNumber;
- } catch(InterruptedException e) {
- synchronized(this) {
- if(isDeprecated)
- throw new KeyChangedException();
- }
- }
- }
- }
- private long timeWouldBlock = -1;
- static final long MAX_WOULD_BLOCK_DELTA = 10 * 60 * 1000;
-
- public boolean wouldBlock(boolean wakeTicker) throws
BlockedTooLongException {
- long now = System.currentTimeMillis();
- synchronized(this) {
- if(sentPacketsContents.wouldBlock(nextPacketNumber)) {
- if(timeWouldBlock == -1)
- timeWouldBlock = now;
- else {
- long delta = now - timeWouldBlock;
- if(delta > MAX_WOULD_BLOCK_DELTA) {
- Logger.error(this, "Not been
able to allocate a packet to tracker " + this + " for " +
TimeUtil.formatTime(delta, 3, true));
- throw new
BlockedTooLongException(this, delta);
- }
- }
- return true;
- } else
- if(timeWouldBlock != -1) {
- long delta = now - timeWouldBlock;
- timeWouldBlock = -1;
- if(delta >
PacketSender.MAX_COALESCING_DELAY)
- Logger.error(this, "Waking
PacketSender: have been blocking for packet ack for " +
TimeUtil.formatTime(delta, 3, true));
- else
- return false;
- } else
- return false;
- }
- pn.node.ps.wakeUp();
- return false;
- }
-
- /**
- * @return A packet number for a new outgoing packet.
- * This method will not block, and will throw an exception
- * if it would need to block.
- * @throws KeyChangedException if the thread is interrupted when waiting
- */
- public int allocateOutgoingPacketNumberNeverBlock() throws
KeyChangedException, NotConnectedException, WouldBlockException {
- int packetNumber;
- if(!pn.isConnected())
- throw new NotConnectedException();
- synchronized(this) {
- packetNumber = nextPacketNumber;
- if(isDeprecated)
- throw new KeyChangedException();
- sentPacketsContents.lockNeverBlock(packetNumber);
- timeWouldBlock = -1;
- nextPacketNumber = packetNumber + 1;
- if(logMINOR)
- Logger.minor(this, "Allocated " + packetNumber
+ " in allocateOutgoingPacketNumberNeverBlock for " + this);
- return packetNumber;
- }
- }
-
- public int[] grabForgotten() {
- if(logMINOR)
- Logger.minor(this, "Grabbing forgotten packet numbers");
- int[] acks;
- synchronized(forgottenQueue) {
- // Grab the acks and tell them they are sent
- int length = forgottenQueue.size();
- acks = new int[length];
- int i = 0;
-
- Iterator it = forgottenQueue.iterator();
- while(it.hasNext()) {
- QueuedForgotten ack = (QueuedForgotten)
it.next();
- acks[i++] = ack.packetNumber;
- if(logMINOR)
- Logger.minor(this, "Grabbing ack " +
ack.packetNumber + " from " + this);
- it.remove(); // sent
- }
- }
- return acks;
- }
-
- public void requeueForgot(int[] forgotPackets, int start, int length) {
- synchronized(forgottenQueue) { // It doesn't do anything else
does it? REDFLAG
- for(int i = start; i < start + length; i++) {
- queueForgotten(i, false);
- }
- }
- }
-
- /**
- * Grab all the currently queued acks to be sent to this node.
- * @return An array of packet numbers that we need to acknowledge.
- */
- public int[] grabAcks() {
- if(logMINOR)
- Logger.minor(this, "Grabbing acks");
- int[] acks;
- synchronized(ackQueue) {
- // Grab the acks and tell them they are sent
- int length = ackQueue.size();
- acks = new int[length];
- int i = 0;
- Iterator<QueuedAck> it = ackQueue.iterator();
- while(it.hasNext()) {
- QueuedAck ack = it.next();
- acks[i++] = ack.packetNumber;
- if(logMINOR)
- Logger.minor(this, "Grabbing ack " +
ack.packetNumber + " from " + this);
- it.remove(); // sent
- }
- }
- return acks;
- }
-
- /**
- * Grab all the currently queued resend requests.
- * @return An array of the packet numbers of all the packets we want to
be resent.
- * @throws NotConnectedException If the peer is no longer connected.
- */
- public int[] grabResendRequests() throws NotConnectedException {
- UpdatableSortedLinkedListItem[] items;
- int[] packetNumbers;
- int realLength;
- long now = System.currentTimeMillis();
- try {
- synchronized(resendRequestQueue) {
- items = resendRequestQueue.toArray();
- int length = items.length;
- packetNumbers = new int[length];
- realLength = 0;
- for(int i = 0; i < length; i++) {
- QueuedResendRequest qrr =
(QueuedResendRequest) items[i];
-
if(packetNumbersReceived.contains(qrr.packetNumber)) {
- if(logMINOR)
- Logger.minor(this,
"Have already seen " + qrr.packetNumber + ", removing from resend list");
- resendRequestQueue.remove(qrr);
- continue;
- }
- if(qrr.activeTime <= now) {
- packetNumbers[realLength++] =
qrr.packetNumber;
- if(logMINOR)
- Logger.minor(this,
"Grabbing resend request: " + qrr.packetNumber + " from " + this);
- qrr.sent();
- } else if(logMINOR)
- Logger.minor(this, "Rejecting
resend request: " + qrr.packetNumber + " - in future by " + (qrr.activeTime -
now) + "ms for " + this);
- }
- }
- } catch(UpdatableSortedLinkedListKilledException e) {
- throw new NotConnectedException();
- }
- int[] trimmedPacketNumbers = new int[realLength];
- System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0,
realLength);
- return trimmedPacketNumbers;
- }
-
- public int[] grabAckRequests() throws NotConnectedException,
StillNotAckedException {
- UpdatableSortedLinkedListItem[] items;
- int[] packetNumbers;
- int realLength;
- if(logMINOR)
- Logger.minor(this, "Grabbing ack requests");
- try {
- synchronized(ackRequestQueue) {
- long now = System.currentTimeMillis();
- items = ackRequestQueue.toArray();
- int length = items.length;
- packetNumbers = new int[length];
- realLength = 0;
- for(int i = 0; i < length; i++) {
- QueuedAckRequest qr =
(QueuedAckRequest) items[i];
- int packetNumber = qr.packetNumber;
- if(qr.activeTime <= now) {
-
if(sentPacketsContents.get(packetNumber) == null) {
- if(logMINOR)
-
Logger.minor(this, "Asking to ack packet which has already been acked: " +
packetNumber + " on " + this + ".grabAckRequests");
-
ackRequestQueue.remove(qr);
- continue;
- }
- if(now - qr.createdTime > 2 *
60 * 1000) {
- if(logMINOR)
-
Logger.minor(this, "Packet " + qr.packetNumber + " sent over " + (now -
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
- if(now - qr.createdTime
> 10 * 60 * 1000) {
-
Logger.error(this, "Packet " + qr.packetNumber + " sent over " + (now -
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
- throw new
StillNotAckedException(this);
- }
- }
- packetNumbers[realLength++] =
packetNumber;
- if(logMINOR)
- Logger.minor(this,
"Grabbing ack request " + packetNumber + " (" + realLength + ") from " + this);
- qr.sent();
- } else if(logMINOR)
- Logger.minor(this, "Ignoring
ack request " + packetNumber + " (" + realLength + ") - will become active in "
+ (qr.activeTime - now) + "ms on " + this + " - " + qr);
- }
- }
- } catch(UpdatableSortedLinkedListKilledException e) {
- throw new NotConnectedException();
- }
- if(logMINOR)
- Logger.minor(this, "realLength now " + realLength);
- int[] trimmedPacketNumbers = new int[realLength];
- System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0,
realLength);
- if(logMINOR)
- Logger.minor(this, "Returning " +
trimmedPacketNumbers.length + " ackRequests");
- return trimmedPacketNumbers;
- }
-
- /**
- * @return The time at which we will have to send some
- * notifications. Or Long.MAX_VALUE if there are none to send.
- */
- public long getNextUrgentTime() {
- long earliestTime = Long.MAX_VALUE;
- synchronized(ackQueue) {
- if(!ackQueue.isEmpty()) {
- QueuedAck qa = ackQueue.get(0);
- earliestTime = qa.urgentTime;
- }
- }
- synchronized(resendRequestQueue) {
- if(!resendRequestQueue.isEmpty()) {
- QueuedResendRequest qr = (QueuedResendRequest)
resendRequestQueue.getLowest();
- earliestTime = Math.min(earliestTime,
qr.urgentTime);
- }
- }
- synchronized(ackRequestQueue) {
- if(!ackRequestQueue.isEmpty()) {
- QueuedAckRequest qr = (QueuedAckRequest)
ackRequestQueue.getLowest();
- earliestTime = Math.min(earliestTime,
qr.urgentTime);
- }
- }
- return earliestTime;
- }
-
- /**
- * @return The last sent new packet number.
- */
- public int getLastOutgoingSeqNumber() {
- synchronized(this) {
- return nextPacketNumber - 1;
- }
- }
-
- /**
- * Report a packet has been sent
- * @param data The data we have just sent (payload only, decrypted).
- * @param seqNumber The packet number.
- * @throws NotConnectedException
- */
- public void sentPacket(byte[] data, int seqNumber,
AsyncMessageCallback[] callbacks, short priority) throws NotConnectedException {
- if(callbacks != null)
- for(int i = 0; i < callbacks.length; i++) {
- if(callbacks[i] == null)
- throw new NullPointerException();
- }
- sentPacketsContents.add(seqNumber, data, callbacks, priority);
- try {
- queueAckRequest(seqNumber);
- } catch(UpdatableSortedLinkedListKilledException e) {
- throw new NotConnectedException();
- }
- }
-
- /**
- * Clear the KeyTracker. Depreciate it, clear all resend, ack,
request-ack etc queues.
- * Return the messages we still had in flight. The caller will then
either add them to
- * another KeyTracker, or call their callbacks to indicate failure.
- */
- private LimitedRangeIntByteArrayMapElement[] clear() {
- if(logMINOR)
- Logger.minor(this, "Clearing " + this);
- isDeprecated = true;
- LimitedRangeIntByteArrayMapElement[] elements;
- synchronized(sentPacketsContents) {
- elements = sentPacketsContents.grabAll(); // will clear
- }
- synchronized(ackQueue) {
- ackQueue.clear();
- }
- synchronized(resendRequestQueue) {
- resendRequestQueue.kill();
- }
- synchronized(ackRequestQueue) {
- ackRequestQueue.kill();
- }
- synchronized(packetsToResend) {
- packetsToResend.clear();
- }
- packetNumbersReceived.clear();
- return elements;
- }
-
- /**
- * Completely deprecate the KeyTracker, in favour of a new one.
- * It will no longer be used for anything. The KeyTracker will be
cleared and all outstanding packets
- * moved to the new KeyTracker.
- *
- * *** Must only be called if the KeyTracker is not to be kept.
Otherwise, we may receive some packets twice. ***
- */
- public void completelyDeprecated(KeyTracker newTracker) {
- if(logMINOR)
- Logger.minor(this, "Completely deprecated: " + this + "
in favour of " + newTracker);
- LimitedRangeIntByteArrayMapElement[] elements = clear();
- if(elements.length == 0)
- return; // nothing more to do
- MessageItem[] messages = new MessageItem[elements.length];
- for(int i = 0; i < elements.length; i++) {
- LimitedRangeIntByteArrayMapElement element =
elements[i];
- byte[] buf = element.data;
- AsyncMessageCallback[] callbacks = element.callbacks;
- // Ignore packet#
- if(logMINOR)
- Logger.minor(this, "Queueing resend of what was
once " + element.packetNumber);
- messages[i] = new MessageItem(buf, callbacks, true,
pn.resendByteCounter, element.priority);
- }
- pn.requeueMessageItems(messages, 0, messages.length, true);
-
- pn.node.ps.wakeUp();
- }
-
- /**
- * Called when the node appears to have been disconnected.
- * Dump all sent messages.
- */
- public void disconnected() {
- // Clear everything, call the callbacks
- LimitedRangeIntByteArrayMapElement[] elements = clear();
- for(int i = 0; i < elements.length; i++) {
- LimitedRangeIntByteArrayMapElement element =
elements[i];
- AsyncMessageCallback[] callbacks = element.callbacks;
- if(callbacks != null)
- for(int j = 0; j < callbacks.length; j++)
- callbacks[j].disconnected();
- }
- }
-
- /**
- * Fill rpiTemp with ResendPacketItems of packets that need to be
- * resent.
- * @return An array of integers which contains the packet numbers
- * to be resent (the RPI's are put into rpiTemp), or null if there
- * are no packets to resend.
- *
- * Not a very nice API, but it saves a load of allocations, and at
- * least it's documented!
- */
- public int[] grabResendPackets(Vector<ResendPacketItem> rpiTemp, int[]
numbers) {
- rpiTemp.clear();
- long now = System.currentTimeMillis();
- long fourRTTs = twoRTTs();
- int count = 0;
- synchronized(packetsToResend) {
- int len = packetsToResend.size();
- if(numbers.length < len)
- numbers = new int[len * 2];
- for(Iterator<Integer> it = packetsToResend.iterator();
it.hasNext();) {
- int packetNo = it.next();
- long resentTime =
sentPacketsContents.getReaddedTime(packetNo);
- if(now - resentTime > fourRTTs) {
- // Either never resent, or resent at
least 4 RTTs ago
- numbers[count++] = packetNo;
- it.remove();
- }
- }
- packetsToResend.clear();
- }
- for(int i = 0; i < count; i++) {
- int packetNo = numbers[i];
- byte[] buf = sentPacketsContents.get(packetNo);
- if(buf == null) {
- if(logMINOR)
- Logger.minor(this, "Contents null for "
+ packetNo + " in grabResendPackets on " + this);
- continue; // acked already?
- }
- AsyncMessageCallback[] callbacks =
sentPacketsContents.getCallbacks(packetNo);
- short priority =
sentPacketsContents.getPriority(packetNo, DMT.PRIORITY_BULK_DATA);
- rpiTemp.add(new ResendPacketItem(buf, packetNo, this,
callbacks, priority));
- }
- if(rpiTemp.isEmpty())
- return null;
- return numbers;
- }
-
- public boolean hasPacketsToResend() {
- synchronized(packetsToResend) {
- return !packetsToResend.isEmpty();
- }
- }
-
- public boolean isDeprecated() {
- return this.isDeprecated;
- }
-
- public int countAckRequests() {
- synchronized(ackRequestQueue) {
- return ackRequestQueue.size();
- }
- }
-
- public int countResendRequests() {
- synchronized(resendRequestQueue) {
- return resendRequestQueue.size();
- }
- }
-
- public int countAcks() {
- synchronized(ackQueue) {
- return ackQueue.size();
- }
- }
-
- public synchronized long timeLastDecodedPacket() {
- return timeLastDecodedPacket;
- }
}
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-12-02
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-12-02
13:53:48 UTC (rev 24012)
@@ -34,8 +34,9 @@
/**
* Resend a single packet.
+ * @param kt The KeyTracker on which to send the packet.
*/
- public void resend(ResendPacketItem item) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException;
+ public void resend(ResendPacketItem item, KeyTracker kt) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException;
/**
* Build a packet and send it. From a Message recently converted into
byte[],
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-12-02 13:48:21 UTC
(rev 24011)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-12-02 13:53:48 UTC
(rev 24012)
@@ -1005,7 +1005,7 @@
public boolean isConnected() {
long now = System.currentTimeMillis();
synchronized(this) {
- if(isConnected && currentTracker != null &&
!currentTracker.isDeprecated()) {
+ if(isConnected && currentTracker != null &&
!currentTracker.packets.isDeprecated()) {
timeLastConnected = now;
return true;
}
@@ -1168,9 +1168,9 @@
mi.onDisconnect();
}
}
- if(cur != null) cur.disconnected();
- if(prev != null) prev.disconnected();
- if(unv != null) unv.disconnected();
+ if(cur != null) cur.packets.disconnected();
+ if(prev != null) prev.packets.disconnected();
+ if(unv != null) unv.packets.disconnected();
if(_lastThrottle != null)
_lastThrottle.maybeDisconnected();
node.lm.lostOrRestartedNode(this);
@@ -1268,22 +1268,22 @@
}
KeyTracker kt = cur;
if(kt != null) {
- long next = kt.getNextUrgentTime();
+ long next = kt.packets.getNextUrgentTime();
t = Math.min(t, next);
if(next < now && logMINOR)
Logger.minor(this, "Next urgent time from
curTracker less than now");
- if(kt.hasPacketsToResend()) return now;
+ if(kt.packets.hasPacketsToResend()) return now;
}
kt = prev;
if(kt != null) {
- long next = kt.getNextUrgentTime();
+ long next = kt.packets.getNextUrgentTime();
t = Math.min(t, next);
if(next < now && logMINOR)
Logger.minor(this, "Next urgent time from
prevTracker less than now");
- if(kt.hasPacketsToResend()) return now;
+ if(kt.packets.hasPacketsToResend()) return now;
}
try {
- if(cur != null && !cur.wouldBlock(false))
+ if(cur != null && !cur.packets.wouldBlock(false))
t = messageQueue.getNextUrgentTime(t, now);
// If there isn't a current tracker, no point worrying
about it as we won't be able to send it anyway...
} catch (BlockedTooLongException e) {
@@ -1295,11 +1295,11 @@
private synchronized boolean mustSendNotificationsNow(long now) {
KeyTracker kt = currentTracker;
if(kt != null) {
- if(kt.getNextUrgentTime() < now) return true;
+ if(kt.packets.getNextUrgentTime() < now) return true;
}
kt = previousTracker;
if(kt != null)
- if(kt.getNextUrgentTime() < now) return true;
+ if(kt.packets.getNextUrgentTime() < now) return true;
return false;
}
@@ -1852,7 +1852,8 @@
routable = false;
} else
older = false;
- KeyTracker newTracker = new KeyTracker(this, encCipher, encKey);
+ PacketTracker packets = new PacketTracker(this);
+ KeyTracker newTracker = new KeyTracker(this, packets,
encCipher, encKey);
if(logMINOR) Logger.minor(this, "New key tracker in
completedHandshake: "+newTracker+" for "+shortToString());
changedIP(replyTo);
boolean bootIDChanged = false;
@@ -1904,7 +1905,7 @@
previousTracker =
unverifiedTracker;
}
unverifiedTracker = newTracker;
- if(currentTracker == null ||
currentTracker.isDeprecated())
+ if(currentTracker == null ||
currentTracker.packets.isDeprecated())
isConnected = false;
} else {
prev = currentTracker;
@@ -1941,11 +1942,11 @@
node.usm.onRestart(this);
}
if(oldPrev != null)
- oldPrev.completelyDeprecated(newTracker);
+ oldPrev.packets.completelyDeprecated(newTracker);
if(oldCur != null)
- oldCur.completelyDeprecated(newTracker);
+ oldCur.packets.completelyDeprecated(newTracker);
if(prev != null)
- prev.deprecated();
+ prev.packets.deprecated();
PacketThrottle throttle;
synchronized(this) {
throttle = _lastThrottle;
@@ -1978,8 +1979,8 @@
*/
private synchronized void maybeSwapTrackers() {
if(currentTracker == null || previousTracker == null) return;
- long delta = Math.abs(currentTracker.createdTime -
previousTracker.createdTime);
- if(previousTracker != null && (!previousTracker.isDeprecated())
&&
+ long delta = Math.abs(currentTracker.packets.createdTime -
previousTracker.packets.createdTime);
+ if(previousTracker != null &&
(!previousTracker.packets.isDeprecated()) &&
delta < CHECK_FOR_SWAPPED_TRACKERS_INTERVAL) {
// Swap prev and current iff H(new key) > H(old key).
// To deal with race conditions (node A gets 1 current
2 prev, node B gets 2 current 1 prev; when we rekey we lose data and cause
problems).
@@ -2012,7 +2013,7 @@
} else {
if (logMINOR)
Logger.minor(this, "Not swapping KeyTracker's:
previousTracker = " + previousTracker.toString()
- + (previousTracker.isDeprecated() ? "
(deprecated)" : "") + " time delta = " + delta);
+ +
(previousTracker.packets.isDeprecated() ? " (deprecated)" : "") + " time delta
= " + delta);
}
}
@@ -2066,7 +2067,7 @@
synchronized(this) {
if(sentInitialMessages)
return;
- if(currentTracker != null &&
!currentTracker.isDeprecated()) // FIXME is that possible?
+ if(currentTracker != null &&
!currentTracker.packets.isDeprecated()) // FIXME is that possible?
sentInitialMessages = true;
else
return;
@@ -2147,7 +2148,7 @@
long now = System.currentTimeMillis();
KeyTracker completelyDeprecatedTracker;
synchronized(this) {
- if(tracker == unverifiedTracker &&
!tracker.isDeprecated()) {
+ if(tracker == unverifiedTracker &&
!tracker.packets.isDeprecated()) {
if(logMINOR)
Logger.minor(this, "Promoting
unverified tracker " + tracker + " for " + getPeer());
completelyDeprecatedTracker = previousTracker;
@@ -2160,7 +2161,7 @@
ctx = null;
maybeSwapTrackers();
if(previousTracker != null)
- previousTracker.deprecated();
+ previousTracker.packets.deprecated();
} else
return;
}
@@ -2168,7 +2169,7 @@
setPeerNodeStatus(now);
node.peers.addConnectedPeer(this);
if(completelyDeprecatedTracker != null)
-
completelyDeprecatedTracker.completelyDeprecated(tracker);
+
completelyDeprecatedTracker.packets.completelyDeprecated(tracker);
}
private synchronized boolean invalidVersion() {
@@ -2434,7 +2435,7 @@
}
KeyTracker tracker = cur;
if(tracker != null) {
- long t = tracker.getNextUrgentTime();
+ long t = tracker.packets.getNextUrgentTime();
if(t < now || forceSendPrimary) {
try {
if(logMINOR) Logger.minor(this,
"Sending urgent notifications for current tracker on "+shortToString());
@@ -2454,7 +2455,7 @@
}
tracker = prev;
if(tracker != null) {
- long t = tracker.getNextUrgentTime();
+ long t = tracker.packets.getNextUrgentTime();
if(t < now)
try {
if(logMINOR) Logger.minor(this,
"Sending urgent notifications for previous tracker on "+shortToString());
@@ -2484,13 +2485,13 @@
cur = currentTracker;
prev = previousTracker;
}
- long t = prev.getNextUrgentTime();
- if(!(t > -1 && prev.timeLastDecodedPacket() > 0 && (now -
prev.timeLastDecodedPacket()) > 60*1000 &&
- cur.timeLastDecodedPacket() > 0 && (now -
cur.timeLastDecodedPacket() < 30*1000) &&
- (prev.countAckRequests() > 0 ||
prev.countResendRequests() > 0)))
+ long t = prev.packets.getNextUrgentTime();
+ if(!(t > -1 && prev.packets.timeLastDecodedPacket() > 0 && (now
- prev.packets.timeLastDecodedPacket()) > 60*1000 &&
+ cur.packets.timeLastDecodedPacket() > 0 && (now
- cur.packets.timeLastDecodedPacket() < 30*1000) &&
+ (prev.packets.countAckRequests() > 0 ||
prev.packets.countResendRequests() > 0)))
return;
Logger.error(this, "No packets decoded on "+prev+" for 60
seconds, deprecating in favour of cur: "+cur);
- prev.completelyDeprecated(cur);
+ prev.packets.completelyDeprecated(cur);
}
/**
@@ -2658,18 +2659,18 @@
if(item.pn != this)
throw new IllegalArgumentException("item.pn !=
this!");
KeyTracker kt = cur;
- if((kt != null) && (item.kt == kt)) {
- kt.resendPacket(item.packetNumber);
+ if((kt != null) && (item.kt == kt.packets)) {
+ kt.packets.resendPacket(item.packetNumber);
continue;
}
kt = prev;
- if((kt != null) && (item.kt == kt)) {
- kt.resendPacket(item.packetNumber);
+ if((kt != null) && (item.kt == kt.packets)) {
+ kt.packets.resendPacket(item.packetNumber);
continue;
}
kt = unv;
- if((kt != null) && (item.kt == kt)) {
- kt.resendPacket(item.packetNumber);
+ if((kt != null) && (item.kt == kt.packets)) {
+ kt.packets.resendPacket(item.packetNumber);
continue;
}
// Doesn't match any of these, need to resend the data
@@ -3155,16 +3156,16 @@
if(isConnected) {
if(currentTracker == null) {
if(unverifiedTracker != null) {
- if(unverifiedTracker.isDeprecated())
+
if(unverifiedTracker.packets.isDeprecated())
Logger.error(this, "Connected
but primary tracker is null and unverified is deprecated ! " +
unverifiedTracker + " for " + this, new Exception("debug"));
else if(logMINOR)
Logger.minor(this, "Connected
but primary tracker is null, but unverified = " + unverifiedTracker + " for " +
this, new Exception("debug"));
} else {
Logger.error(this, "Connected but both
primary and unverified are null on " + this, new Exception("debug"));
}
- } else if(currentTracker.isDeprecated()) {
+ } else if(currentTracker.packets.isDeprecated()) {
if(unverifiedTracker != null) {
- if(unverifiedTracker.isDeprecated())
+
if(unverifiedTracker.packets.isDeprecated())
Logger.error(this, "Connected
but primary tracker is deprecated, unverified is deprecated: primary=" +
currentTracker + " unverified: " + unverifiedTracker + " for " + this, new
Exception("debug"));
else if(logMINOR)
Logger.minor(this, "Connected,
primary tracker deprecated, unverified is valid, " + unverifiedTracker + " for
" + this, new Exception("debug"));
@@ -4063,7 +4064,7 @@
kt = getPreviousKeyTracker();
if(kt == null)
continue;
- int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
+ int[] tmp = kt.packets.grabResendPackets(rpiTemp,
rpiIntTemp);
if(tmp == null)
continue;
rpiIntTemp = tmp;
@@ -4073,7 +4074,7 @@
try {
if(logMINOR)
Logger.minor(this, "Resending "
+ item.packetNumber + " to " + item.kt);
- getOutgoingMangler().resend(item);
+ getOutgoingMangler().resend(item, kt);
return true;
} catch(KeyChangedException e) {
Logger.error(this, "Caught " + e + "
resending packets to " + kt);
Modified: trunk/freenet/src/freenet/node/ResendPacketItem.java
===================================================================
--- trunk/freenet/src/freenet/node/ResendPacketItem.java 2008-12-02
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/ResendPacketItem.java 2008-12-02
13:53:48 UTC (rev 24012)
@@ -10,7 +10,7 @@
* message as byte[].
*/
public class ResendPacketItem {
- public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k,
AsyncMessageCallback[] callbacks, short priority) {
+ public ResendPacketItem(byte[] payload, int packetNumber, PacketTracker k,
AsyncMessageCallback[] callbacks, short priority) {
pn = k.pn;
kt = k;
buf = payload;
@@ -19,7 +19,7 @@
this.priority = priority;
}
final PeerNode pn;
- final KeyTracker kt;
+ final PacketTracker kt;
final byte[] buf;
final int packetNumber;
final AsyncMessageCallback[] callbacks;
Modified: trunk/freenet/src/freenet/node/StillNotAckedException.java
===================================================================
--- trunk/freenet/src/freenet/node/StillNotAckedException.java 2008-12-02
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/StillNotAckedException.java 2008-12-02
13:53:48 UTC (rev 24012)
@@ -12,10 +12,10 @@
*/
public class StillNotAckedException extends Exception {
- public StillNotAckedException(KeyTracker tracker) {
+ public StillNotAckedException(PacketTracker tracker) {
this.tracker = tracker;
}
- final KeyTracker tracker;
+ final PacketTracker tracker;
}