Author: toad
Date: 2008-12-02 13:53:48 +0000 (Tue, 02 Dec 2008)
New Revision: 24012

Modified:
   trunk/freenet/src/freenet/node/BlockedTooLongException.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/ResendPacketItem.java
   trunk/freenet/src/freenet/node/StillNotAckedException.java
Log:
REFACTOR: KeyTracker -> PacketTracker + KeyTracker.
KeyTracker is now simply a session key, PacketTracker is all the retransmission 
and acking logic.
One KeyTracker has a fixed PacketTracker. However a PacketTracker may (soon) be 
used by multiple KeyTracker's.


Modified: trunk/freenet/src/freenet/node/BlockedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/node/BlockedTooLongException.java 2008-12-02 
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/BlockedTooLongException.java 2008-12-02 
13:53:48 UTC (rev 24012)
@@ -2,10 +2,10 @@

 public class BlockedTooLongException extends Exception {

-       public final KeyTracker tracker;
+       public final PacketTracker tracker;
        public final long delta;

-       public BlockedTooLongException(KeyTracker tracker, long delta) {
+       public BlockedTooLongException(PacketTracker tracker, long delta) {
                this.tracker = tracker;
                this.delta = delta;
        }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-12-02 
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-12-02 
13:53:48 UTC (rev 24012)
@@ -1773,7 +1773,9 @@
                                (seqBuf[2] & 0xff)) << 8) +
                                (seqBuf[3] & 0xff);

-               int targetSeqNumber = 
tracker.highestReceivedIncomingSeqNumber();
+               PacketTracker packets = tracker.packets;
+               
+               int targetSeqNumber = 
packets.highestReceivedIncomingSeqNumber();
                if(logMINOR) Logger.minor(this, "Seqno: "+seqNumber+" (highest 
seen "+targetSeqNumber+") receiving packet from "+tracker.pn.getPeer());

                if(seqNumber == -1) {
@@ -1924,7 +1926,8 @@
                        acks[i] = referenceSeqNumber - offset;
                }

-               tracker.acknowledgedPackets(acks);
+               PacketTracker packets = tracker.packets;
+               packets.acknowledgedPackets(acks);

                int retransmitCount = decrypted[ptr++] & 0xff;
                if(logMINOR) Logger.minor(this, "Retransmit requests: 
"+retransmitCount);
@@ -1936,7 +1939,7 @@
                        }
                        int realSeqNo = referenceSeqNumber - offset;
                        if(logMINOR) Logger.minor(this, "RetransmitRequest: 
"+realSeqNo);
-                       tracker.resendPacket(realSeqNo);
+                       packets.resendPacket(realSeqNo);
                }

                int ackRequestsCount = decrypted[ptr++] & 0xff;
@@ -1951,7 +1954,7 @@
                        }
                        int realSeqNo = realSeqNumber - offset;
                        if(logMINOR) Logger.minor(this, "AckRequest: 
"+realSeqNo);
-                       tracker.receivedAckRequest(realSeqNo);
+                       packets.receivedAckRequest(realSeqNo);
                }

                int forgottenCount = decrypted[ptr++] & 0xff;
@@ -1963,7 +1966,7 @@
                                Logger.error(this, "Packet not long enough at 
byte "+ptr+" on "+tracker);
                        }
                        int realSeqNo = realSeqNumber - offset;
-                       tracker.destForgotPacket(realSeqNo);
+                       packets.destForgotPacket(realSeqNo);
                }

                tracker.pn.receivedPacket(false, true); // Must keep the 
connection open, even if it's an ack packet only and on an incompatible 
connection - we may want to do a UOM transfer e.g.
@@ -1971,13 +1974,13 @@

                // No sequence number == no messages

-               if((seqNumber != -1) && tracker.alreadyReceived(seqNumber)) {
-                       tracker.queueAck(seqNumber); // Must keep the 
connection open!
+               if((seqNumber != -1) && packets.alreadyReceived(seqNumber)) {
+                       packets.queueAck(seqNumber); // Must keep the 
connection open!
                        if(logMINOR) Logger.minor(this, "Received packet twice 
("+seqNumber+") from "+tracker.pn.getPeer()+": "+seqNumber+" 
("+TimeUtil.formatTime((long) tracker.pn.averagePingTime(), 2, true)+" ping 
avg)");
                        return;
                }

-               tracker.receivedPacket(seqNumber);
+               packets.receivedPacket(seqNumber);

                if(seqNumber == -1) {
                        if(logMINOR) Logger.minor(this, "Returning because 
seqno = "+seqNumber);
@@ -2031,7 +2034,8 @@
                        }
                        return false;
                }
-               if(kt.wouldBlock(false)) {
+               PacketTracker packets = kt.packets;
+               if(packets.wouldBlock(false)) {
                        if(logMINOR) Logger.minor(this, "Would block: "+kt);
                        // Requeue
                        if(!dontRequeue) {
@@ -2040,7 +2044,7 @@
                        return false;
                }
                int length = 1;
-               length += kt.countAcks() + kt.countAckRequests() + 
kt.countResendRequests();
+               length += packets.countAcks() + packets.countAckRequests() + 
packets.countResendRequests();
                int callbacksCount = 0;
                int x = 0;
                String mi_name = null;
@@ -2052,7 +2056,7 @@
                        if(mi.formatted) {
                                try {
                                        byte[] buf = mi.getData();
-                                       int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
+                                       int packetNumber = 
packets.allocateOutgoingPacketNumberNeverBlock();
                                        int size = 
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb, 
mi.getPriority());
                                        //MARK: onSent()
                                        mi.onSent(size);
@@ -2165,7 +2169,7 @@
                                requeueLogString = ", requeueing remaining 
messages";
                        }
                        length = 1;
-                       length += kt.countAcks() + kt.countAckRequests() + 
kt.countResendRequests();
+                       length += packets.countAcks() + 
packets.countAckRequests() + packets.countResendRequests();
                        int count = 0;
                        int lastIndex = 0;
                        if(logMINOR)
@@ -2282,8 +2286,9 @@
                                        Logger.normal(this, "Dropping packet: 
Not connected to "+peer.getPeer()+" yet(2)");
                                        throw new NotConnectedException();
                                }
-                               int seqNo = neverWaitForPacketNumber ? 
tracker.allocateOutgoingPacketNumberNeverBlock() :
-                                       tracker.allocateOutgoingPacketNumber();
+                               PacketTracker packets = tracker.packets;
+                               int seqNo = neverWaitForPacketNumber ? 
packets.allocateOutgoingPacketNumberNeverBlock() :
+                                       packets.allocateOutgoingPacketNumber();
                                return processOutgoingPreformatted(buf, offset, 
length, tracker, seqNo, callbacks, priority);
                        } catch (KeyChangedException e) {
                                Logger.normal(this, "Key changed(2) for 
"+peer.getPeer());
@@ -2331,6 +2336,7 @@

                int[] acks, resendRequests, ackRequests, forgotPackets;
                int seqNumber;
+               PacketTracker packets = tracker.packets;
                /* Locking:
                 * Avoid allocating a packet number, then a long pause due to 
                 * overload, during which many other packets are sent, 
@@ -2350,7 +2356,7 @@
                                // Ack/resendreq only packet
                                seqNumber = -1;
                        else
-                               seqNumber = 
tracker.allocateOutgoingPacketNumberNeverBlock();
+                               seqNumber = 
packets.allocateOutgoingPacketNumberNeverBlock();
                }

                if(logMINOR) Logger.minor(this, "Sequence number (sending): 
"+seqNumber+" ("+packetNumber+") to "+tracker.pn.getPeer());
@@ -2363,12 +2369,12 @@

                try {
                synchronized(tracker) {
-                       acks = tracker.grabAcks();
-                       forgotPackets = tracker.grabForgotten();
-                       resendRequests = tracker.grabResendRequests();
-                       ackRequests = tracker.grabAckRequests();
-                       realSeqNumber = tracker.getLastOutgoingSeqNumber();
-                       otherSideSeqNumber = 
tracker.highestReceivedIncomingSeqNumber();
+                       acks = packets.grabAcks();
+                       forgotPackets = packets.grabForgotten();
+                       resendRequests = packets.grabResendRequests();
+                       ackRequests = packets.grabAckRequests();
+                       realSeqNumber = packets.getLastOutgoingSeqNumber();
+                       otherSideSeqNumber = 
packets.highestReceivedIncomingSeqNumber();
                        if(logMINOR) Logger.minor(this, "Sending packet to 
"+tracker.pn.getPeer()+", other side max seqno: "+otherSideSeqNumber);
                }
                } catch (StillNotAckedException e) {
@@ -2516,7 +2522,7 @@
                                if(logMINOR) Logger.minor(this, "Forgot packet 
"+i+": "+seq);
                                int offsetSeq = realSeqNumber - seq;
                                if((offsetSeq > 255) || (offsetSeq < 0)) {
-                                       if(tracker.isDeprecated()) {
+                                       if(packets.isDeprecated()) {
                                                // Oh well
                                                Logger.error(this, "Dropping 
forgot-packet notification on deprecated tracker: "+seq+" on "+tracker+" - real 
seq="+realSeqNumber);
                                                // Ignore it
@@ -2531,7 +2537,7 @@
                                        forgotOffsets[i] = (byte) offsetSeq;
                                        forgotCount++;
                                        if(forgotCount == 256)
-                                               
tracker.requeueForgot(forgotPackets, forgotCount, forgotPackets.length - 
forgotCount);
+                                               
packets.requeueForgot(forgotPackets, forgotCount, forgotPackets.length - 
forgotCount);
                                }
                        }
                }
@@ -2561,7 +2567,7 @@
                if(seqNumber != -1) {
                        byte[] saveable = new byte[length];
                        System.arraycopy(buf, offset, saveable, 0, length);
-                       tracker.sentPacket(saveable, seqNumber, callbacks, 
priority);
+                       packets.sentPacket(saveable, seqNumber, callbacks, 
priority);
                }

                if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);
@@ -2782,8 +2788,8 @@
                return !((PeerNode)context).isConnected();
        }

-       public void resend(ResendPacketItem item) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException {
-               int size = processOutgoingPreformatted(item.buf, 0, 
item.buf.length, item.kt, item.packetNumber, item.callbacks, item.priority);
+       public void resend(ResendPacketItem item, KeyTracker tracker) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException {
+               int size = processOutgoingPreformatted(item.buf, 0, 
item.buf.length, tracker, item.packetNumber, item.callbacks, item.priority);
                item.pn.resendByteCounter.sentBytes(size);
        }


Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2008-12-02 13:48:21 UTC 
(rev 24011)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2008-12-02 13:53:48 UTC 
(rev 24012)
@@ -3,1133 +3,35 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Vector;
-
 import freenet.crypt.BlockCipher;
-import freenet.io.comm.AsyncMessageCallback;
-import freenet.io.comm.DMT;
-import freenet.io.comm.NotConnectedException;
-import freenet.io.xfer.PacketThrottle;
-import freenet.support.DoublyLinkedList;
-import freenet.support.IndexableUpdatableSortedLinkedListItem;
-import freenet.support.LimitedRangeIntByteArrayMap;
-import freenet.support.LimitedRangeIntByteArrayMapElement;
-import freenet.support.Logger;
-import freenet.support.ReceivedPacketNumbers;
-import freenet.support.TimeUtil;
-import freenet.support.UpdatableSortedLinkedListItem;
-import freenet.support.UpdatableSortedLinkedListKilledException;
-import freenet.support.UpdatableSortedLinkedListWithForeignIndex;
-import freenet.support.WouldBlockException;
-import freenet.support.DoublyLinkedList.Item;

 /**
- * @author amphibian
- * 
- * Class to track everything related to a single key on a single
- * PeerNode. In particular, the key itself, packets sent and
- * received, and packet numbers.
+ * Class representing a single session key.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
  */
 public class KeyTracker {
-
+       
+       /** A PacketTracker may have more than one KeyTracker, but a KeyTracker 
+        * may only have one PacketTracker. In other words, in some cases it is
+        * possible to change the session key without invalidating the packet
+        * sequence, but it is never possible to invalidate the packet sequence
+        * without changing the session key. */
+       final PacketTracker packets;
+       
        private static boolean logMINOR;
        /** Parent PeerNode */
        public final PeerNode pn;
-       /** Are we the secondary key? */
-       private volatile boolean isDeprecated;
        /** Cipher to both encrypt outgoing packets with and decrypt
         * incoming ones. */
        public final BlockCipher sessionCipher;
        /** Key for above cipher, so far for debugging */
        public final byte[] sessionKey;
-       /** Packets we have sent to the node, minus those that have
-        * been acknowledged. */
-       private final LimitedRangeIntByteArrayMap sentPacketsContents;
-       /** Serial numbers of packets that we want to acknowledge,
-        * and when they become urgent. We always add to the end,
-        * and we always remove from the beginning, so should always
-        * be consistent. */
-       private final List<QueuedAck> ackQueue;
-       /** Serial numbers of packets that we have forgotten. Usually
-        * when we have forgotten a packet it just means that it has 
-        * been shifted to another KeyTracker because this one was
-        * deprecated; the messages will get through in the end.
-        */
-       private final List<QueuedForgotten> forgottenQueue;
-       /** The highest incoming serial number we have ever seen
-        * from the other side. Includes actual packets and resend
-        * requests (provided they are within range). */
-       private int highestSeenIncomingSerialNumber;
-       /** Serial numbers of packets we want to be resent by the
-        * other side to us, the time at which they become sendable,
-        * and the time at which they become urgent. In order of
-        * the latter. */
-       private final UpdatableSortedLinkedListWithForeignIndex 
resendRequestQueue;
-       /** Serial numbers of packets we want to be acknowledged by
-        * the other side, the time at which they become sendable,
-        * and the time at which they become urgent. In order of
-        * the latter. */
-       private final UpdatableSortedLinkedListWithForeignIndex ackRequestQueue;
-       /** Numbered packets that we need to send to the other side
-        * because they asked for them. Just contains the numbers. */
-       private final HashSet<Integer> packetsToResend;
-       /** Ranges of packet numbers we have received from the other
-        * side. */
-       private final ReceivedPacketNumbers packetNumbersReceived;
-       /** Counter to generate the next packet number */
-       private int nextPacketNumber;
-       final long createdTime;
-       /** The time at which we last successfully decoded a packet. */
-       private long timeLastDecodedPacket;

-       /** Everything is clear to start with */
-       KeyTracker(PeerNode pn, BlockCipher cipher, byte[] sessionKey) {
-               this.pn = pn;
+       KeyTracker(PeerNode parent, PacketTracker tracker, BlockCipher cipher, 
byte[] sessionKey) {
+               this.pn = parent;
+               this.packets = tracker;
                this.sessionCipher = cipher;
                this.sessionKey = sessionKey;
-               ackQueue = new LinkedList<QueuedAck>();
-               forgottenQueue = new LinkedList<QueuedForgotten>();
-               highestSeenIncomingSerialNumber = -1;
-               // give some leeway
-               sentPacketsContents = new LimitedRangeIntByteArrayMap(128);
-               resendRequestQueue = new 
UpdatableSortedLinkedListWithForeignIndex();
-               ackRequestQueue = new 
UpdatableSortedLinkedListWithForeignIndex();
-               packetsToResend = new HashSet<Integer>();
-               packetNumbersReceived = new ReceivedPacketNumbers(512);
-               isDeprecated = false;
-               nextPacketNumber = pn.node.random.nextInt(100 * 1000);
-               createdTime = System.currentTimeMillis();
-               logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }

-       /**
-        * Set the deprecated flag to indicate that we are now
-        * no longer the primary key. And wake up any threads trying to lock
-        * a packet number; they can be sent with the new KT.
-        * 
-        * After this, new packets will not be sent. It will not be possible to 
allocate a new
-        * packet number. However, old resend requests etc may still be sent.
-        */
-       public void deprecated() {
-               logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               isDeprecated = true;
-               sentPacketsContents.interrupt();
-       }
-
-       /**
-        * @return The highest received incoming serial number.
-        */
-       public int highestReceivedIncomingSeqNumber() {
-               return this.highestSeenIncomingSerialNumber;
-       }
-
-       /**
-        * Received this packet??
-        */
-       public boolean alreadyReceived(int seqNumber) {
-               return packetNumbersReceived.contains(seqNumber);
-       }
-
-       /** toString() - don't leak the key unless asked to */
-       @Override
-       public String toString() {
-               return super.toString() + " for " + pn.shortToString();
-       }
-
-       /**
-        * Queue an ack to be sent back to the node soon.
-        * @param seqNumber The number of the packet to be acked.
-        */
-       public void queueAck(int seqNumber) {
-               if(logMINOR)
-                       Logger.minor(this, "Queueing ack for " + seqNumber);
-               QueuedAck qa = new QueuedAck(seqNumber);
-               synchronized(ackQueue) {
-                       ackQueue.add(qa);
-               }
-       // Will go urgent in 200ms
-       }
-
-       public void queueForgotten(int seqNumber) {
-               queueForgotten(seqNumber, true);
-       }
-
-       public void queueForgotten(int seqNumber, boolean log) {
-               if(log && ((!isDeprecated) || logMINOR)) {
-                       String msg = "Queueing forgotten for " + seqNumber + " 
for " + this;
-                       if(!isDeprecated)
-                               Logger.error(this, msg);
-                       else
-                               Logger.minor(this, msg);
-               }
-               QueuedForgotten qf = new QueuedForgotten(seqNumber);
-               synchronized(forgottenQueue) {
-                       forgottenQueue.add(qf);
-               }
-       }
-
-       static class PacketActionItem { // anyone got a better name?
-
-               /** Packet sequence number */
-               int packetNumber;
-               /** Time at which this packet's ack or resend request becomes 
urgent
-                * and can trigger an otherwise empty packet to be sent. */
-               long urgentTime;
-
-               @Override
-               public String toString() {
-                       return super.toString() + ": packet " + packetNumber + 
" urgent@" + urgentTime + '(' + (System.currentTimeMillis() - urgentTime) + ')';
-               }
-       }
-
-       private final static class QueuedAck extends PacketActionItem {
-
-               QueuedAck(int packet) {
-                       long now = System.currentTimeMillis();
-                       packetNumber = packet;
-                       /** If not included on a packet in next 200ms, then
-                        * force a send of an otherwise empty packet.
-                        */
-                       urgentTime = now + 200;
-               }
-       }
-
-       // FIXME this is almost identical to QueuedAck, coalesce the classes
-       private final static class QueuedForgotten extends PacketActionItem {
-
-               QueuedForgotten(int packet) {
-                       long now = System.currentTimeMillis();
-                       packetNumber = packet;
-                       /** If not included on a packet in next 500ms, then
-                        * force a send of an otherwise empty packet.
-                        */
-                       urgentTime = now + 500;
-               }
-       }
-
-       private abstract class BaseQueuedResend extends PacketActionItem
-               implements IndexableUpdatableSortedLinkedListItem {
-
-               /** Time at which this item becomes sendable.
-                * When we send a resend request, this is reset to t+500ms.
-                * 
-                * Constraint: urgentTime is always greater than activeTime.
-                */
-               long activeTime;
-
-               void sent() throws UpdatableSortedLinkedListKilledException {
-                       long now = System.currentTimeMillis();
-                       activeTime = now + 500;
-                       urgentTime = activeTime + urgentDelay();
-               // This is only removed when we actually receive the packet
-               // But for now it will sleep
-               }
-
-               BaseQueuedResend(int packetNumber) {
-                       this.packetNumber = packetNumber;
-                       long now = System.currentTimeMillis();
-                       activeTime = initialActiveTime(now);
-                       urgentTime = activeTime + urgentDelay();
-               }
-
-               abstract long urgentDelay();
-
-               abstract long initialActiveTime(long now);
-               private Item next;
-               private Item prev;
-
-               public final Item getNext() {
-                       return next;
-               }
-
-               public final Item setNext(Item i) {
-                       Item old = next;
-                       next = i;
-                       return old;
-               }
-
-               public Item getPrev() {
-                       return prev;
-               }
-
-               public Item setPrev(Item i) {
-                       Item old = prev;
-                       prev = i;
-                       return old;
-               }
-
-               public int compareTo(Object o) {
-                       BaseQueuedResend r = (BaseQueuedResend) o;
-                       if(urgentTime > r.urgentTime)
-                               return 1;
-                       if(urgentTime < r.urgentTime)
-                               return -1;
-                       if(packetNumber > r.packetNumber)
-                               return 1;
-                       if(packetNumber < r.packetNumber)
-                               return -1;
-                       return 0;
-               }
-
-               public Object indexValue() {
-                       return packetNumber;
-               }
-               private DoublyLinkedList parent;
-
-               public DoublyLinkedList getParent() {
-                       return parent;
-               }
-
-               public DoublyLinkedList setParent(DoublyLinkedList l) {
-                       DoublyLinkedList old = parent;
-                       parent = l;
-                       return old;
-               }
-       }
-
-       private class QueuedResendRequest extends BaseQueuedResend {
-
-               @Override
-               long initialActiveTime( long now) {
-                       return now; // Active immediately; reordering is rare
-               }
-
-               QueuedResendRequest(int packetNumber) {
-                       super(packetNumber);
-               }
-
-               @Override
-               void sent() throws UpdatableSortedLinkedListKilledException {
-                       synchronized(resendRequestQueue) {
-                               super.sent();
-                               resendRequestQueue.update(this);
-                       }
-               }
-
-               @Override
-               long urgentDelay() {
-                       return PacketSender.MAX_COALESCING_DELAY; // Urgent 
pretty soon
-               }
-       }
-
-       private class QueuedAckRequest extends BaseQueuedResend {
-
-               final long createdTime;
-               long activeDelay;
-
-               @Override
-               long initialActiveTime( long now) {
-                       // Request an ack after four RTTs
-                       activeDelay = twoRTTs();
-                       return now + activeDelay;
-               }
-
-               QueuedAckRequest(int packetNumber) {
-                       super(packetNumber);
-                       this.createdTime = System.currentTimeMillis();
-               }
-
-               @Override
-               void sent() throws UpdatableSortedLinkedListKilledException {
-                       synchronized(ackRequestQueue) {
-                               super.sent();
-                               ackRequestQueue.update(this);
-                       }
-               }
-
-               /**
-                * Acknowledged.
-                */
-               public void onAcked() {
-                       long t = Math.max(0, System.currentTimeMillis() - 
createdTime);
-                       pn.reportPing(t);
-                       if(logMINOR)
-                               Logger.minor(this, "Reported round-trip time of 
" + TimeUtil.formatTime(t, 2, true) + " on " + pn.getPeer() + " (avg " + 
TimeUtil.formatTime((long) pn.averagePingTime(), 2, true) + ", #" + 
packetNumber + ')');
-               }
-
-               @Override
-               long urgentDelay() {
-                       return PacketSender.MAX_COALESCING_DELAY;
-               }
-       }
-
-       /**
-        * Called when we receive a packet.
-        * @param seqNumber The packet's serial number.
-        * See the comments in FNPPacketMangler.processOutgoing* for
-        * the reason for the locking.
-        */
-       public synchronized void receivedPacket(int seqNumber) {
-               timeLastDecodedPacket = System.currentTimeMillis();
-               logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               if(logMINOR)
-                       Logger.minor(this, "Received packet " + seqNumber + " 
from " + pn.shortToString());
-               if(seqNumber == -1)
-                       return;
-               // FIXME delete this log statement
-               if(logMINOR)
-                       Logger.minor(this, "Still received packet: " + 
seqNumber);
-               // Received packet
-               receivedPacketNumber(seqNumber);
-               // Ack it even if it is a resend
-               queueAck(seqNumber);
-       }
-
-       // TCP uses four RTTs with no ack to resend ... but we have a more 
drawn out protocol, we
-       // should use only two.
-       public long twoRTTs() {
-               // FIXME upper bound necessary ?
-               return (long) Math.min(Math.max(250, pn.averagePingTime() * 2), 
2500);
-       }
-
-       protected void receivedPacketNumber(int seqNumber) {
-               if(logMINOR)
-                       Logger.minor(this, "Handling received packet number " + 
seqNumber);
-               queueResendRequests(seqNumber);
-               packetNumbersReceived.got(seqNumber);
-               try {
-                       removeResendRequest(seqNumber);
-               } catch(UpdatableSortedLinkedListKilledException e) {
-                       // Ignore, not our problem
-               }
-               synchronized(this) {
-                       highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, seqNumber);
-               }
-               if(logMINOR)
-                       Logger.minor(this, "Handled received packet number " + 
seqNumber);
-       }
-
-       /**
-        * Remove a resend request from the queue.
-        * @param seqNumber
-        * @throws UpdatableSortedLinkedListKilledException 
-        */
-       private void removeResendRequest(int seqNumber) throws 
UpdatableSortedLinkedListKilledException {
-               synchronized(resendRequestQueue) {
-                       resendRequestQueue.removeByKey(seqNumber);
-               }
-       }
-
-       /**
-        * Add some resend requests if necessary.
-        * @param seqNumber The number of the packet we just received.
-        */
-       private void queueResendRequests(int seqNumber) {
-               int max;
-               synchronized(this) {
-                       max = packetNumbersReceived.highest();
-               }
-               if(seqNumber > max)
-                       try {
-                               if((max != -1) && (seqNumber - max > 1)) {
-                                       if(logMINOR)
-                                               Logger.minor(this, "Queueing 
resends from " + max + " to " + seqNumber);
-                                       // Missed some packets out
-                                       for(int i = max + 1; i < seqNumber; 
i++) {
-                                               queueResendRequest(i);
-                                       }
-                               }
-                       } catch(UpdatableSortedLinkedListKilledException e) {
-                               // Ignore (we are decoding packet, not sending 
one)
-                       }
-       }
-
-       /**
-        * Queue a resend request
-        * @param packetNumber the packet serial number to queue a
-        * resend request for
-        * @throws UpdatableSortedLinkedListKilledException 
-        */
-       private void queueResendRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
-               synchronized(resendRequestQueue) {
-                       if(queuedResendRequest(packetNumber)) {
-                               if(logMINOR)
-                                       Logger.minor(this, "Not queueing resend 
request for " + packetNumber + " - already queued");
-                               return;
-                       }
-                       if(logMINOR)
-                               Logger.minor(this, "Queueing resend request for 
" + packetNumber);
-                       QueuedResendRequest qrr = new 
QueuedResendRequest(packetNumber);
-                       resendRequestQueue.add(qrr);
-               }
-       }
-
-       /**
-        * Queue an ack request
-        * @param packetNumber the packet serial number to queue a
-        * resend request for
-        * @throws UpdatableSortedLinkedListKilledException 
-        */
-       private void queueAckRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
-               synchronized(ackRequestQueue) {
-                       // FIXME should we just remove the existing ack 
request? If we do, we get a better
-                       // estimate of RTT on lossy links... if we don't, lossy 
links will include the average
-                       // time to send a packet including all resends. The 
latter may be useful, and in fact
-                       // the former is unreliable...
-                       if(queuedAckRequest(packetNumber)) {
-                               if(logMINOR)
-                                       Logger.minor(this, "Not queueing ack 
request for " + packetNumber + " - already queued");
-                               return;
-                       }
-                       if(logMINOR)
-                               Logger.minor(this, "Queueing ack request for " 
+ packetNumber + " on " + this);
-                       QueuedAckRequest qrr = new 
QueuedAckRequest(packetNumber);
-                       ackRequestQueue.add(qrr);
-               }
-       }
-
-       /**
-        * Is an ack request queued for this packet number?
-        */
-       private boolean queuedAckRequest(int packetNumber) {
-               synchronized(ackRequestQueue) {
-                       return ackRequestQueue.containsKey(packetNumber);
-               }
-       }
-
-       /**
-        * Is a resend request queued for this packet number?
-        */
-       private boolean queuedResendRequest(int packetNumber) {
-               synchronized(resendRequestQueue) {
-                       return resendRequestQueue.containsKey(packetNumber);
-               }
-       }
-
-       /**
-        * Called when we have received several packet acknowledgements.
-        * Synchronized for the same reason as the sender code is:
-        * So that we don't end up sending packets too late when overloaded,
-        * and get horrible problems such as asking to resend packets which
-        * haven't been sent yet.
-        */
-       public synchronized void acknowledgedPackets(int[] seqNos) {
-               AsyncMessageCallback[][] callbacks = new 
AsyncMessageCallback[seqNos.length][];
-               for(int i = 0; i < seqNos.length; i++) {
-                       int realSeqNo = seqNos[i];
-                       if(logMINOR)
-                               Logger.minor(this, "Acknowledged packet: " + 
realSeqNo);
-                       try {
-                               removeAckRequest(realSeqNo);
-                       } catch(UpdatableSortedLinkedListKilledException e) {
-                               // Ignore, we are processing an incoming packet
-                       }
-                       if(logMINOR)
-                               Logger.minor(this, "Removed ack request");
-                       callbacks[i] = 
sentPacketsContents.getCallbacks(realSeqNo);
-                       byte[] buf = sentPacketsContents.get(realSeqNo);
-                       long timeAdded = sentPacketsContents.getTime(realSeqNo);
-                       if(sentPacketsContents.remove(realSeqNo))
-                               if(buf.length > Node.PACKET_SIZE) {
-                                       PacketThrottle throttle = 
pn.getThrottle();
-                                       throttle.notifyOfPacketAcknowledged();
-                                       
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
-                               }
-               }
-               int cbCount = 0;
-               for(int i = 0; i < callbacks.length; i++) {
-                       AsyncMessageCallback[] cbs = callbacks[i];
-                       if(cbs != null)
-                               for(int j = 0; j < cbs.length; j++) {
-                                       cbs[j].acknowledged();
-                                       cbCount++;
-                               }
-               }
-               if(cbCount > 0 && logMINOR)
-                       Logger.minor(this, "Executed " + cbCount + " 
callbacks");
-               try {
-                       wouldBlock(true);
-               } catch(BlockedTooLongException e) {
-                       // Ignore, will come up again. In any case it's rather 
unlikely...
-               }
-       }
-
-       /**
-        * Called when we have received a packet acknowledgement.
-        * @param realSeqNo
-        */
-       public void acknowledgedPacket(int realSeqNo) {
-               logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               AsyncMessageCallback[] callbacks;
-               if(logMINOR)
-                       Logger.minor(this, "Acknowledged packet: " + realSeqNo);
-               try {
-                       synchronized(this) {
-                               removeAckRequest(realSeqNo);
-                       }
-               } catch(UpdatableSortedLinkedListKilledException e) {
-                       // Ignore, we are processing an incoming packet
-               }
-               if(logMINOR)
-                       Logger.minor(this, "Removed ack request");
-               callbacks = sentPacketsContents.getCallbacks(realSeqNo);
-               byte[] buf = sentPacketsContents.get(realSeqNo);
-               long timeAdded = sentPacketsContents.getTime(realSeqNo);
-               if(sentPacketsContents.remove(realSeqNo))
-                       if(buf.length > Node.PACKET_SIZE) {
-                               PacketThrottle throttle = pn.getThrottle();
-                               throttle.notifyOfPacketAcknowledged();
-                               
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
-                       }
-               try {
-                       wouldBlock(true);
-               } catch(BlockedTooLongException e) {
-                       // Ignore, will come up again. In any case it's rather 
unlikely...
-               }
-               if(callbacks != null) {
-                       for(int i = 0; i < callbacks.length; i++)
-                               callbacks[i].acknowledged();
-                       if(logMINOR)
-                               Logger.minor(this, "Executed " + 
callbacks.length + " callbacks");
-               }
-       }
-
-       /**
-        * Remove an ack request from the queue by packet number.
-        * @throws UpdatableSortedLinkedListKilledException 
-        */
-       private void removeAckRequest(int seqNo) throws 
UpdatableSortedLinkedListKilledException {
-               QueuedAckRequest qr = null;
-
-               synchronized(ackRequestQueue) {
-                       qr = (QueuedAckRequest) 
ackRequestQueue.removeByKey(seqNo);
-               }
-               if(qr != null)
-                       qr.onAcked();
-               else
-                       Logger.normal(this, "Removing ack request twice? Null 
on " + seqNo + " from " + pn.getPeer() + " (" + TimeUtil.formatTime((int) 
pn.averagePingTime(), 2, true) + " ping avg)");
-       }
-
-       /**
-        * Resend (off-thread but ASAP) a packet.
-        * @param seqNumber The serial number of the packet to be
-        * resent.
-        */
-       public void resendPacket(int seqNumber) {
-               byte[] resendData = sentPacketsContents.get(seqNumber);
-               if(resendData != null) {
-                       if(resendData.length > Node.PACKET_SIZE)
-                               pn.getThrottle().notifyOfPacketLost();
-                       synchronized(packetsToResend) {
-                               packetsToResend.add(seqNumber);
-                       }
-                       pn.node.ps.wakeUp();
-               } else {
-                       String msg = "Asking me to resend packet " + seqNumber +
-                               " which we haven't sent yet or which they have 
already acked (next=" + nextPacketNumber + ')';
-                       // Might just be late, but could indicate something 
serious.
-                       if(isDeprecated) {
-                               if(logMINOR)
-                                       Logger.minor(this, "Other side wants us 
to resend packet " + seqNumber + " for " + this + " - we cannot do this because 
we are deprecated");
-                       } else
-                               Logger.normal(this, msg);
-               }
-       }
-
-       /**
-        * Called when we receive an AckRequest.
-        * @param packetNumber The packet that the other side wants
-        * us to re-ack.
-        */
-       public synchronized void receivedAckRequest(int packetNumber) {
-               if(queuedAck(packetNumber)) {
-                       // Already going to send an ack
-                       // Don't speed it up though; wasteful
-               } else if(packetNumbersReceived.contains(packetNumber))
-                       // We have received it, so send them an ack
-                       queueAck(packetNumber);
-               else {
-                       // We have not received it, so get them to resend it
-                       try {
-                               queueResendRequest(packetNumber);
-                       } catch(UpdatableSortedLinkedListKilledException e) {
-                               // Ignore, we are decoding, not sending.
-                               }
-                       highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, packetNumber);
-               }
-       }
-
-       /**
-        * Is there a queued ack with the given packet number?
-        * FIXME: have a hashtable? The others do, but probably it
-        * isn't necessary. We should be consistent about this -
-        * either take it out of UpdatableSortedLinkedListWithForeignIndex,
-        * or add one here.
-        */
-       private boolean queuedAck(int packetNumber) {
-               synchronized(ackQueue) {
-                       for(QueuedAck qa : ackQueue) {
-                               if(qa.packetNumber == packetNumber)
-                                       return true;
-                       }
-               }
-               return false;
-       }
-
-       /**
-        * Destination forgot a packet.
-        * This is normal if we are the secondary key.
-        * @param seqNumber The packet number lost.
-        */
-       public void destForgotPacket(int seqNumber) {
-               if(isDeprecated)
-                       Logger.normal(this, "Destination forgot packet: " + 
seqNumber);
-               else
-                       Logger.error(this, "Destination forgot packet: " + 
seqNumber);
-               synchronized(this) {
-                       try {
-                               removeResendRequest(seqNumber);
-                       } catch(UpdatableSortedLinkedListKilledException e) {
-                               // Ignore
-                       }
-               }
-       }
-
-       /**
-        * @return A packet number for a new outgoing packet.
-        * This method will block until one is available if
-        * necessary.
-        * @throws KeyChangedException if the thread is interrupted when waiting
-        */
-       public int allocateOutgoingPacketNumber() throws KeyChangedException, 
NotConnectedException {
-               int packetNumber;
-               if(!pn.isConnected())
-                       throw new NotConnectedException();
-               synchronized(this) {
-                       if(isDeprecated)
-                               throw new KeyChangedException();
-                       packetNumber = nextPacketNumber++;
-                       if(logMINOR)
-                               Logger.minor(this, "Allocated " + packetNumber 
+ " in allocateOutgoingPacketNumber for " + this);
-               }
-               while(true) {
-                       try {
-                               sentPacketsContents.lock(packetNumber);
-                               if(logMINOR)
-                                       Logger.minor(this, "Locked " + 
packetNumber);
-                               synchronized(this) {
-                                       timeWouldBlock = -1;
-                               }
-                               return packetNumber;
-                       } catch(InterruptedException e) {
-                               synchronized(this) {
-                                       if(isDeprecated)
-                                               throw new KeyChangedException();
-                               }
-                       }
-               }
-       }
-       private long timeWouldBlock = -1;
-       static final long MAX_WOULD_BLOCK_DELTA = 10 * 60 * 1000;
-
-       public boolean wouldBlock(boolean wakeTicker) throws 
BlockedTooLongException {
-               long now = System.currentTimeMillis();
-               synchronized(this) {
-                       if(sentPacketsContents.wouldBlock(nextPacketNumber)) {
-                               if(timeWouldBlock == -1)
-                                       timeWouldBlock = now;
-                               else {
-                                       long delta = now - timeWouldBlock;
-                                       if(delta > MAX_WOULD_BLOCK_DELTA) {
-                                               Logger.error(this, "Not been 
able to allocate a packet to tracker " + this + " for " + 
TimeUtil.formatTime(delta, 3, true));
-                                               throw new 
BlockedTooLongException(this, delta);
-                                       }
-                               }
-                               return true;
-                       } else
-                               if(timeWouldBlock != -1) {
-                                       long delta = now - timeWouldBlock;
-                                       timeWouldBlock = -1;
-                                       if(delta > 
PacketSender.MAX_COALESCING_DELAY)
-                                               Logger.error(this, "Waking 
PacketSender: have been blocking for packet ack for " + 
TimeUtil.formatTime(delta, 3, true));
-                                       else
-                                               return false;
-                               } else
-                                       return false;
-               }
-               pn.node.ps.wakeUp();
-               return false;
-       }
-
-       /**
-        * @return A packet number for a new outgoing packet.
-        * This method will not block, and will throw an exception
-        * if it would need to block.
-        * @throws KeyChangedException if the thread is interrupted when waiting
-        */
-       public int allocateOutgoingPacketNumberNeverBlock() throws 
KeyChangedException, NotConnectedException, WouldBlockException {
-               int packetNumber;
-               if(!pn.isConnected())
-                       throw new NotConnectedException();
-               synchronized(this) {
-                       packetNumber = nextPacketNumber;
-                       if(isDeprecated)
-                               throw new KeyChangedException();
-                       sentPacketsContents.lockNeverBlock(packetNumber);
-                       timeWouldBlock = -1;
-                       nextPacketNumber = packetNumber + 1;
-                       if(logMINOR)
-                               Logger.minor(this, "Allocated " + packetNumber 
+ " in allocateOutgoingPacketNumberNeverBlock for " + this);
-                       return packetNumber;
-               }
-       }
-
-       public int[] grabForgotten() {
-               if(logMINOR)
-                       Logger.minor(this, "Grabbing forgotten packet numbers");
-               int[] acks;
-               synchronized(forgottenQueue) {
-                       // Grab the acks and tell them they are sent
-                       int length = forgottenQueue.size();
-                       acks = new int[length];
-                       int i = 0;
-
-                       Iterator it = forgottenQueue.iterator();
-                       while(it.hasNext()) {
-                               QueuedForgotten ack = (QueuedForgotten) 
it.next();
-                               acks[i++] = ack.packetNumber;
-                               if(logMINOR)
-                                       Logger.minor(this, "Grabbing ack " + 
ack.packetNumber + " from " + this);
-                               it.remove();    // sent
-                       }
-               }
-               return acks;
-       }
-
-       public void requeueForgot(int[] forgotPackets, int start, int length) {
-               synchronized(forgottenQueue) { // It doesn't do anything else 
does it? REDFLAG
-                       for(int i = start; i < start + length; i++) {
-                               queueForgotten(i, false);
-                       }
-               }
-       }
-
-       /**
-        * Grab all the currently queued acks to be sent to this node.
-        * @return An array of packet numbers that we need to acknowledge.
-        */
-       public int[] grabAcks() {
-               if(logMINOR)
-                       Logger.minor(this, "Grabbing acks");
-               int[] acks;
-               synchronized(ackQueue) {
-                       // Grab the acks and tell them they are sent
-                       int length = ackQueue.size();
-                       acks = new int[length];
-                       int i = 0;
-                       Iterator<QueuedAck> it = ackQueue.iterator();
-                       while(it.hasNext()) {
-                               QueuedAck ack = it.next();
-                               acks[i++] = ack.packetNumber;
-                               if(logMINOR)
-                                       Logger.minor(this, "Grabbing ack " + 
ack.packetNumber + " from " + this);
-                               it.remove();    // sent
-                       }
-               }
-               return acks;
-       }
-
-       /**
-        * Grab all the currently queued resend requests.
-        * @return An array of the packet numbers of all the packets we want to 
be resent.
-        * @throws NotConnectedException If the peer is no longer connected.
-        */
-       public int[] grabResendRequests() throws NotConnectedException {
-               UpdatableSortedLinkedListItem[] items;
-               int[] packetNumbers;
-               int realLength;
-               long now = System.currentTimeMillis();
-               try {
-                       synchronized(resendRequestQueue) {
-                               items = resendRequestQueue.toArray();
-                               int length = items.length;
-                               packetNumbers = new int[length];
-                               realLength = 0;
-                               for(int i = 0; i < length; i++) {
-                                       QueuedResendRequest qrr = 
(QueuedResendRequest) items[i];
-                                       
if(packetNumbersReceived.contains(qrr.packetNumber)) {
-                                               if(logMINOR)
-                                                       Logger.minor(this, 
"Have already seen " + qrr.packetNumber + ", removing from resend list");
-                                               resendRequestQueue.remove(qrr);
-                                               continue;
-                                       }
-                                       if(qrr.activeTime <= now) {
-                                               packetNumbers[realLength++] = 
qrr.packetNumber;
-                                               if(logMINOR)
-                                                       Logger.minor(this, 
"Grabbing resend request: " + qrr.packetNumber + " from " + this);
-                                               qrr.sent();
-                                       } else if(logMINOR)
-                                               Logger.minor(this, "Rejecting 
resend request: " + qrr.packetNumber + " - in future by " + (qrr.activeTime - 
now) + "ms for " + this);
-                               }
-                       }
-               } catch(UpdatableSortedLinkedListKilledException e) {
-                       throw new NotConnectedException();
-               }
-               int[] trimmedPacketNumbers = new int[realLength];
-               System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
-               return trimmedPacketNumbers;
-       }
-
-       public int[] grabAckRequests() throws NotConnectedException, 
StillNotAckedException {
-               UpdatableSortedLinkedListItem[] items;
-               int[] packetNumbers;
-               int realLength;
-               if(logMINOR)
-                       Logger.minor(this, "Grabbing ack requests");
-               try {
-                       synchronized(ackRequestQueue) {
-                               long now = System.currentTimeMillis();
-                               items = ackRequestQueue.toArray();
-                               int length = items.length;
-                               packetNumbers = new int[length];
-                               realLength = 0;
-                               for(int i = 0; i < length; i++) {
-                                       QueuedAckRequest qr = 
(QueuedAckRequest) items[i];
-                                       int packetNumber = qr.packetNumber;
-                                       if(qr.activeTime <= now) {
-                                               
if(sentPacketsContents.get(packetNumber) == null) {
-                                                       if(logMINOR)
-                                                               
Logger.minor(this, "Asking to ack packet which has already been acked: " + 
packetNumber + " on " + this + ".grabAckRequests");
-                                                       
ackRequestQueue.remove(qr);
-                                                       continue;
-                                               }
-                                               if(now - qr.createdTime > 2 * 
60 * 1000) {
-                                                       if(logMINOR)
-                                                               
Logger.minor(this, "Packet " + qr.packetNumber + " sent over " + (now - 
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
-                                                       if(now - qr.createdTime 
> 10 * 60 * 1000) {
-                                                               
Logger.error(this, "Packet " + qr.packetNumber + " sent over " + (now - 
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
-                                                               throw new 
StillNotAckedException(this);
-                                                       }
-                                               }
-                                               packetNumbers[realLength++] = 
packetNumber;
-                                               if(logMINOR)
-                                                       Logger.minor(this, 
"Grabbing ack request " + packetNumber + " (" + realLength + ") from " + this);
-                                               qr.sent();
-                                       } else if(logMINOR)
-                                               Logger.minor(this, "Ignoring 
ack request " + packetNumber + " (" + realLength + ") - will become active in " 
+ (qr.activeTime - now) + "ms on " + this + " - " + qr);
-                               }
-                       }
-               } catch(UpdatableSortedLinkedListKilledException e) {
-                       throw new NotConnectedException();
-               }
-               if(logMINOR)
-                       Logger.minor(this, "realLength now " + realLength);
-               int[] trimmedPacketNumbers = new int[realLength];
-               System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
-               if(logMINOR)
-                       Logger.minor(this, "Returning " + 
trimmedPacketNumbers.length + " ackRequests");
-               return trimmedPacketNumbers;
-       }
-
-       /**
-        * @return The time at which we will have to send some
-        * notifications. Or Long.MAX_VALUE if there are none to send.
-        */
-       public long getNextUrgentTime() {
-               long earliestTime = Long.MAX_VALUE;
-               synchronized(ackQueue) {
-                       if(!ackQueue.isEmpty()) {
-                               QueuedAck qa = ackQueue.get(0);
-                               earliestTime = qa.urgentTime;
-                       }
-               }
-               synchronized(resendRequestQueue) {
-                       if(!resendRequestQueue.isEmpty()) {
-                               QueuedResendRequest qr = (QueuedResendRequest) 
resendRequestQueue.getLowest();
-                               earliestTime = Math.min(earliestTime, 
qr.urgentTime);
-                       }
-               }
-               synchronized(ackRequestQueue) {
-                       if(!ackRequestQueue.isEmpty()) {
-                               QueuedAckRequest qr = (QueuedAckRequest) 
ackRequestQueue.getLowest();
-                               earliestTime = Math.min(earliestTime, 
qr.urgentTime);
-                       }
-               }
-               return earliestTime;
-       }
-
-       /**
-        * @return The last sent new packet number.
-        */
-       public int getLastOutgoingSeqNumber() {
-               synchronized(this) {
-                       return nextPacketNumber - 1;
-               }
-       }
-
-       /**
-        * Report a packet has been sent
-        * @param data The data we have just sent (payload only, decrypted). 
-        * @param seqNumber The packet number.
-        * @throws NotConnectedException 
-        */
-       public void sentPacket(byte[] data, int seqNumber, 
AsyncMessageCallback[] callbacks, short priority) throws NotConnectedException {
-               if(callbacks != null)
-                       for(int i = 0; i < callbacks.length; i++) {
-                               if(callbacks[i] == null)
-                                       throw new NullPointerException();
-                       }
-               sentPacketsContents.add(seqNumber, data, callbacks, priority);
-               try {
-                       queueAckRequest(seqNumber);
-               } catch(UpdatableSortedLinkedListKilledException e) {
-                       throw new NotConnectedException();
-               }
-       }
-
-       /**
-        * Clear the KeyTracker. Depreciate it, clear all resend, ack, 
request-ack etc queues.
-        * Return the messages we still had in flight. The caller will then 
either add them to
-        * another KeyTracker, or call their callbacks to indicate failure.
-        */
-       private LimitedRangeIntByteArrayMapElement[] clear() {
-               if(logMINOR)
-                       Logger.minor(this, "Clearing " + this);
-               isDeprecated = true;
-               LimitedRangeIntByteArrayMapElement[] elements;
-               synchronized(sentPacketsContents) {
-                       elements = sentPacketsContents.grabAll(); // will clear
-               }
-               synchronized(ackQueue) {
-                       ackQueue.clear();
-               }
-               synchronized(resendRequestQueue) {
-                       resendRequestQueue.kill();
-               }
-               synchronized(ackRequestQueue) {
-                       ackRequestQueue.kill();
-               }
-               synchronized(packetsToResend) {
-                       packetsToResend.clear();
-               }
-               packetNumbersReceived.clear();
-               return elements;
-       }
-
-       /**
-        * Completely deprecate the KeyTracker, in favour of a new one. 
-        * It will no longer be used for anything. The KeyTracker will be 
cleared and all outstanding packets
-        * moved to the new KeyTracker.
-        * 
-        * *** Must only be called if the KeyTracker is not to be kept. 
Otherwise, we may receive some packets twice. ***
-        */
-       public void completelyDeprecated(KeyTracker newTracker) {
-               if(logMINOR)
-                       Logger.minor(this, "Completely deprecated: " + this + " 
in favour of " + newTracker);
-               LimitedRangeIntByteArrayMapElement[] elements = clear();
-               if(elements.length == 0)
-                       return; // nothing more to do
-               MessageItem[] messages = new MessageItem[elements.length];
-               for(int i = 0; i < elements.length; i++) {
-                       LimitedRangeIntByteArrayMapElement element = 
elements[i];
-                       byte[] buf = element.data;
-                       AsyncMessageCallback[] callbacks = element.callbacks;
-                       // Ignore packet#
-                       if(logMINOR)
-                               Logger.minor(this, "Queueing resend of what was 
once " + element.packetNumber);
-                       messages[i] = new MessageItem(buf, callbacks, true, 
pn.resendByteCounter, element.priority);
-               }
-               pn.requeueMessageItems(messages, 0, messages.length, true);
-
-               pn.node.ps.wakeUp();
-       }
-
-       /**
-        * Called when the node appears to have been disconnected.
-        * Dump all sent messages.
-        */
-       public void disconnected() {
-               // Clear everything, call the callbacks
-               LimitedRangeIntByteArrayMapElement[] elements = clear();
-               for(int i = 0; i < elements.length; i++) {
-                       LimitedRangeIntByteArrayMapElement element = 
elements[i];
-                       AsyncMessageCallback[] callbacks = element.callbacks;
-                       if(callbacks != null)
-                               for(int j = 0; j < callbacks.length; j++)
-                                       callbacks[j].disconnected();
-               }
-       }
-
-       /**
-        * Fill rpiTemp with ResendPacketItems of packets that need to be
-        * resent.
-        * @return An array of integers which contains the packet numbers
-        * to be resent (the RPI's are put into rpiTemp), or null if there
-        * are no packets to resend.
-        * 
-        * Not a very nice API, but it saves a load of allocations, and at
-        * least it's documented!
-        */
-       public int[] grabResendPackets(Vector<ResendPacketItem> rpiTemp, int[] 
numbers) {
-               rpiTemp.clear();
-               long now = System.currentTimeMillis();
-               long fourRTTs = twoRTTs();
-               int count = 0;
-               synchronized(packetsToResend) {
-                       int len = packetsToResend.size();
-                       if(numbers.length < len)
-                               numbers = new int[len * 2];
-                       for(Iterator<Integer> it = packetsToResend.iterator(); 
it.hasNext();) {
-                               int packetNo = it.next();
-                               long resentTime = 
sentPacketsContents.getReaddedTime(packetNo);
-                               if(now - resentTime > fourRTTs) {
-                                       // Either never resent, or resent at 
least 4 RTTs ago
-                                       numbers[count++] = packetNo;
-                                       it.remove();
-                               }
-                       }
-                       packetsToResend.clear();
-               }
-               for(int i = 0; i < count; i++) {
-                       int packetNo = numbers[i];
-                       byte[] buf = sentPacketsContents.get(packetNo);
-                       if(buf == null) {
-                               if(logMINOR)
-                                       Logger.minor(this, "Contents null for " 
+ packetNo + " in grabResendPackets on " + this);
-                               continue; // acked already?
-                       }
-                       AsyncMessageCallback[] callbacks = 
sentPacketsContents.getCallbacks(packetNo);
-                       short priority = 
sentPacketsContents.getPriority(packetNo, DMT.PRIORITY_BULK_DATA);
-                       rpiTemp.add(new ResendPacketItem(buf, packetNo, this, 
callbacks, priority));
-               }
-               if(rpiTemp.isEmpty())
-                       return null;
-               return numbers;
-       }
-
-       public boolean hasPacketsToResend() {
-               synchronized(packetsToResend) {
-                       return !packetsToResend.isEmpty();
-               }
-       }
-
-       public boolean isDeprecated() {
-               return this.isDeprecated;
-       }
-
-       public int countAckRequests() {
-               synchronized(ackRequestQueue) {
-                       return ackRequestQueue.size();
-               }
-       }
-
-       public int countResendRequests() {
-               synchronized(resendRequestQueue) {
-                       return resendRequestQueue.size();
-               }
-       }
-
-       public int countAcks() {
-               synchronized(ackQueue) {
-                       return ackQueue.size();
-               }
-       }
-
-       public synchronized long timeLastDecodedPacket() {
-               return timeLastDecodedPacket;
-       }
 }

Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-12-02 
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-12-02 
13:53:48 UTC (rev 24012)
@@ -34,8 +34,9 @@

        /**
         * Resend a single packet.
+        * @param kt The KeyTracker on which to send the packet.
         */
-       public void resend(ResendPacketItem item) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException;
+       public void resend(ResendPacketItem item, KeyTracker kt) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException;

        /**
         * Build a packet and send it. From a Message recently converted into 
byte[],

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-12-02 13:48:21 UTC 
(rev 24011)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-12-02 13:53:48 UTC 
(rev 24012)
@@ -1005,7 +1005,7 @@
        public boolean isConnected() {
                long now = System.currentTimeMillis();
                synchronized(this) {
-                       if(isConnected && currentTracker != null && 
!currentTracker.isDeprecated()) {
+                       if(isConnected && currentTracker != null && 
!currentTracker.packets.isDeprecated()) {
                                timeLastConnected = now;
                                return true;
                        }
@@ -1168,9 +1168,9 @@
                                mi.onDisconnect();
                        }
                }
-               if(cur != null) cur.disconnected();
-               if(prev != null) prev.disconnected();
-               if(unv != null) unv.disconnected();
+               if(cur != null) cur.packets.disconnected();
+               if(prev != null) prev.packets.disconnected();
+               if(unv != null) unv.packets.disconnected();
        if(_lastThrottle != null)
                _lastThrottle.maybeDisconnected();
                node.lm.lostOrRestartedNode(this);
@@ -1268,22 +1268,22 @@
                }
                KeyTracker kt = cur;
                if(kt != null) {
-                       long next = kt.getNextUrgentTime();
+                       long next = kt.packets.getNextUrgentTime();
                        t = Math.min(t, next);
                        if(next < now && logMINOR)
                                Logger.minor(this, "Next urgent time from 
curTracker less than now");
-                       if(kt.hasPacketsToResend()) return now;
+                       if(kt.packets.hasPacketsToResend()) return now;
                }
                kt = prev;
                if(kt != null) {
-                       long next = kt.getNextUrgentTime();
+                       long next = kt.packets.getNextUrgentTime();
                        t = Math.min(t, next);
                        if(next < now && logMINOR)
                                Logger.minor(this, "Next urgent time from 
prevTracker less than now");
-                       if(kt.hasPacketsToResend()) return now;
+                       if(kt.packets.hasPacketsToResend()) return now;
                }
                try {
-                       if(cur != null && !cur.wouldBlock(false))
+                       if(cur != null && !cur.packets.wouldBlock(false))
                                t = messageQueue.getNextUrgentTime(t, now);
                        // If there isn't a current tracker, no point worrying 
about it as we won't be able to send it anyway...
                } catch (BlockedTooLongException e) {
@@ -1295,11 +1295,11 @@
        private synchronized boolean mustSendNotificationsNow(long now) {
                KeyTracker kt = currentTracker;
                if(kt != null) {
-                       if(kt.getNextUrgentTime() < now) return true;
+                       if(kt.packets.getNextUrgentTime() < now) return true;
                }
                kt = previousTracker;
                if(kt != null)
-                       if(kt.getNextUrgentTime() < now) return true;
+                       if(kt.packets.getNextUrgentTime() < now) return true;
                return false;
        }

@@ -1852,7 +1852,8 @@
                        routable = false;
                } else
                        older = false;
-               KeyTracker newTracker = new KeyTracker(this, encCipher, encKey);
+               PacketTracker packets = new PacketTracker(this);
+               KeyTracker newTracker = new KeyTracker(this, packets, 
encCipher, encKey);
                if(logMINOR) Logger.minor(this, "New key tracker in 
completedHandshake: "+newTracker+" for "+shortToString());
                changedIP(replyTo);
                boolean bootIDChanged = false;
@@ -1904,7 +1905,7 @@
                                                previousTracker = 
unverifiedTracker;
                                }
                                unverifiedTracker = newTracker;
-                               if(currentTracker == null || 
currentTracker.isDeprecated())
+                               if(currentTracker == null || 
currentTracker.packets.isDeprecated())
                                        isConnected = false;
                        } else {
                                prev = currentTracker;
@@ -1941,11 +1942,11 @@
                        node.usm.onRestart(this);
                }
                if(oldPrev != null)
-                       oldPrev.completelyDeprecated(newTracker);
+                       oldPrev.packets.completelyDeprecated(newTracker);
                if(oldCur != null)
-                       oldCur.completelyDeprecated(newTracker);
+                       oldCur.packets.completelyDeprecated(newTracker);
                if(prev != null)
-                       prev.deprecated();
+                       prev.packets.deprecated();
        PacketThrottle throttle;
        synchronized(this) {
                throttle = _lastThrottle;
@@ -1978,8 +1979,8 @@
         */
        private synchronized void maybeSwapTrackers() {
                if(currentTracker == null || previousTracker == null) return;
-               long delta = Math.abs(currentTracker.createdTime - 
previousTracker.createdTime);
-               if(previousTracker != null && (!previousTracker.isDeprecated()) 
&&
+               long delta = Math.abs(currentTracker.packets.createdTime - 
previousTracker.packets.createdTime);
+               if(previousTracker != null && 
(!previousTracker.packets.isDeprecated()) &&
                                delta < CHECK_FOR_SWAPPED_TRACKERS_INTERVAL) {
                        // Swap prev and current iff H(new key) > H(old key).
                        // To deal with race conditions (node A gets 1 current 
2 prev, node B gets 2 current 1 prev; when we rekey we lose data and cause 
problems).
@@ -2012,7 +2013,7 @@
                } else {
                        if (logMINOR)
                                Logger.minor(this, "Not swapping KeyTracker's: 
previousTracker = " + previousTracker.toString()
-                                       + (previousTracker.isDeprecated() ? " 
(deprecated)" : "") + " time delta = " + delta);
+                                       + 
(previousTracker.packets.isDeprecated() ? " (deprecated)" : "") + " time delta 
= " + delta);
                }
        }

@@ -2066,7 +2067,7 @@
                synchronized(this) {
                        if(sentInitialMessages)
                                return;
-                       if(currentTracker != null && 
!currentTracker.isDeprecated()) // FIXME is that possible?
+                       if(currentTracker != null && 
!currentTracker.packets.isDeprecated()) // FIXME is that possible?
                                sentInitialMessages = true;
                        else
                                return;
@@ -2147,7 +2148,7 @@
                long now = System.currentTimeMillis();
                KeyTracker completelyDeprecatedTracker;
                synchronized(this) {
-                       if(tracker == unverifiedTracker && 
!tracker.isDeprecated()) {
+                       if(tracker == unverifiedTracker && 
!tracker.packets.isDeprecated()) {
                                if(logMINOR)
                                        Logger.minor(this, "Promoting 
unverified tracker " + tracker + " for " + getPeer());
                                completelyDeprecatedTracker = previousTracker;
@@ -2160,7 +2161,7 @@
                                ctx = null;
                                maybeSwapTrackers();
                                if(previousTracker != null)
-                                       previousTracker.deprecated();
+                                       previousTracker.packets.deprecated();
                        } else
                                return;
                }
@@ -2168,7 +2169,7 @@
                setPeerNodeStatus(now);
                node.peers.addConnectedPeer(this);
                if(completelyDeprecatedTracker != null)
-                       
completelyDeprecatedTracker.completelyDeprecated(tracker);
+                       
completelyDeprecatedTracker.packets.completelyDeprecated(tracker);
        }

        private synchronized boolean invalidVersion() {
@@ -2434,7 +2435,7 @@
                }
                KeyTracker tracker = cur;
                if(tracker != null) {
-                       long t = tracker.getNextUrgentTime();
+                       long t = tracker.packets.getNextUrgentTime();
                        if(t < now || forceSendPrimary) {
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Sending urgent notifications for current tracker on "+shortToString());
@@ -2454,7 +2455,7 @@
                }
                tracker = prev;
                if(tracker != null) {
-                       long t = tracker.getNextUrgentTime();
+                       long t = tracker.packets.getNextUrgentTime();
                        if(t < now)
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Sending urgent notifications for previous tracker on "+shortToString());
@@ -2484,13 +2485,13 @@
                        cur = currentTracker;
                        prev = previousTracker;
                }
-               long t = prev.getNextUrgentTime();
-               if(!(t > -1 && prev.timeLastDecodedPacket() > 0 && (now - 
prev.timeLastDecodedPacket()) > 60*1000 && 
-                               cur.timeLastDecodedPacket() > 0 && (now - 
cur.timeLastDecodedPacket() < 30*1000) && 
-                               (prev.countAckRequests() > 0 || 
prev.countResendRequests() > 0)))
+               long t = prev.packets.getNextUrgentTime();
+               if(!(t > -1 && prev.packets.timeLastDecodedPacket() > 0 && (now 
- prev.packets.timeLastDecodedPacket()) > 60*1000 && 
+                               cur.packets.timeLastDecodedPacket() > 0 && (now 
- cur.packets.timeLastDecodedPacket() < 30*1000) && 
+                               (prev.packets.countAckRequests() > 0 || 
prev.packets.countResendRequests() > 0)))
                        return;
                Logger.error(this, "No packets decoded on "+prev+" for 60 
seconds, deprecating in favour of cur: "+cur);
-               prev.completelyDeprecated(cur);
+               prev.packets.completelyDeprecated(cur);
        }

        /**
@@ -2658,18 +2659,18 @@
                        if(item.pn != this)
                                throw new IllegalArgumentException("item.pn != 
this!");
                        KeyTracker kt = cur;
-                       if((kt != null) && (item.kt == kt)) {
-                               kt.resendPacket(item.packetNumber);
+                       if((kt != null) && (item.kt == kt.packets)) {
+                               kt.packets.resendPacket(item.packetNumber);
                                continue;
                        }
                        kt = prev;
-                       if((kt != null) && (item.kt == kt)) {
-                               kt.resendPacket(item.packetNumber);
+                       if((kt != null) && (item.kt == kt.packets)) {
+                               kt.packets.resendPacket(item.packetNumber);
                                continue;
                        }
                        kt = unv;
-                       if((kt != null) && (item.kt == kt)) {
-                               kt.resendPacket(item.packetNumber);
+                       if((kt != null) && (item.kt == kt.packets)) {
+                               kt.packets.resendPacket(item.packetNumber);
                                continue;
                        }
                        // Doesn't match any of these, need to resend the data
@@ -3155,16 +3156,16 @@
                if(isConnected) {
                        if(currentTracker == null) {
                                if(unverifiedTracker != null) {
-                                       if(unverifiedTracker.isDeprecated())
+                                       
if(unverifiedTracker.packets.isDeprecated())
                                                Logger.error(this, "Connected 
but primary tracker is null and unverified is deprecated ! " + 
unverifiedTracker + " for " + this, new Exception("debug"));
                                        else if(logMINOR)
                                                Logger.minor(this, "Connected 
but primary tracker is null, but unverified = " + unverifiedTracker + " for " + 
this, new Exception("debug"));
                                } else {
                                        Logger.error(this, "Connected but both 
primary and unverified are null on " + this, new Exception("debug"));
                                }
-                       } else if(currentTracker.isDeprecated()) {
+                       } else if(currentTracker.packets.isDeprecated()) {
                                if(unverifiedTracker != null) {
-                                       if(unverifiedTracker.isDeprecated())
+                                       
if(unverifiedTracker.packets.isDeprecated())
                                                Logger.error(this, "Connected 
but primary tracker is deprecated, unverified is deprecated: primary=" + 
currentTracker + " unverified: " + unverifiedTracker + " for " + this, new 
Exception("debug"));
                                        else if(logMINOR)
                                                Logger.minor(this, "Connected, 
primary tracker deprecated, unverified is valid, " + unverifiedTracker + " for 
" + this, new Exception("debug"));
@@ -4063,7 +4064,7 @@
                                kt = getPreviousKeyTracker();
                        if(kt == null)
                                continue;
-                       int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
+                       int[] tmp = kt.packets.grabResendPackets(rpiTemp, 
rpiIntTemp);
                        if(tmp == null)
                                continue;
                        rpiIntTemp = tmp;
@@ -4073,7 +4074,7 @@
                                try {
                                        if(logMINOR)
                                                Logger.minor(this, "Resending " 
+ item.packetNumber + " to " + item.kt);
-                                       getOutgoingMangler().resend(item);
+                                       getOutgoingMangler().resend(item, kt);
                                        return true;
                                } catch(KeyChangedException e) {
                                        Logger.error(this, "Caught " + e + " 
resending packets to " + kt);

Modified: trunk/freenet/src/freenet/node/ResendPacketItem.java
===================================================================
--- trunk/freenet/src/freenet/node/ResendPacketItem.java        2008-12-02 
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/ResendPacketItem.java        2008-12-02 
13:53:48 UTC (rev 24012)
@@ -10,7 +10,7 @@
  * message as byte[].
  */
 public class ResendPacketItem {
-    public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k, 
AsyncMessageCallback[] callbacks, short priority) {
+    public ResendPacketItem(byte[] payload, int packetNumber, PacketTracker k, 
AsyncMessageCallback[] callbacks, short priority) {
         pn = k.pn;
         kt = k;
         buf = payload;
@@ -19,7 +19,7 @@
         this.priority = priority;
     }
     final PeerNode pn;
-    final KeyTracker kt;
+    final PacketTracker kt;
     final byte[] buf;
     final int packetNumber;
     final AsyncMessageCallback[] callbacks;

Modified: trunk/freenet/src/freenet/node/StillNotAckedException.java
===================================================================
--- trunk/freenet/src/freenet/node/StillNotAckedException.java  2008-12-02 
13:48:21 UTC (rev 24011)
+++ trunk/freenet/src/freenet/node/StillNotAckedException.java  2008-12-02 
13:53:48 UTC (rev 24012)
@@ -12,10 +12,10 @@
  */
 public class StillNotAckedException extends Exception {

-       public StillNotAckedException(KeyTracker tracker) {
+       public StillNotAckedException(PacketTracker tracker) {
                this.tracker = tracker;
        }

-       final KeyTracker tracker;
+       final PacketTracker tracker;

 }


Reply via email to