Author: toad
Date: 2008-12-02 13:55:19 +0000 (Tue, 02 Dec 2008)
New Revision: 24013

Added:
   trunk/freenet/src/freenet/node/PacketTracker.java
Log:
Doh!


Added: trunk/freenet/src/freenet/node/PacketTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketTracker.java                           
(rev 0)
+++ trunk/freenet/src/freenet/node/PacketTracker.java   2008-12-02 13:55:19 UTC 
(rev 24013)
@@ -0,0 +1,1126 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Vector;
+
+import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.DMT;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.PacketThrottle;
+import freenet.support.DoublyLinkedList;
+import freenet.support.IndexableUpdatableSortedLinkedListItem;
+import freenet.support.LimitedRangeIntByteArrayMap;
+import freenet.support.LimitedRangeIntByteArrayMapElement;
+import freenet.support.Logger;
+import freenet.support.ReceivedPacketNumbers;
+import freenet.support.TimeUtil;
+import freenet.support.UpdatableSortedLinkedListItem;
+import freenet.support.UpdatableSortedLinkedListKilledException;
+import freenet.support.UpdatableSortedLinkedListWithForeignIndex;
+import freenet.support.WouldBlockException;
+import freenet.support.DoublyLinkedList.Item;
+
+/**
+ * @author amphibian
+ * 
+ * Class to track retransmissions, acknowledgements, packet numbers, etc.
+ * May be shared by more than one KeyTracker (aka session key).
+ */
+public class PacketTracker {
+
+       private static boolean logMINOR;
+       /** Parent PeerNode */
+       public final PeerNode pn;
+       /** Are we the secondary key? */
+       private volatile boolean isDeprecated;
+       /** Packets we have sent to the node, minus those that have
+        * been acknowledged. */
+       private final LimitedRangeIntByteArrayMap sentPacketsContents;
+       /** Serial numbers of packets that we want to acknowledge,
+        * and when they become urgent. We always add to the end,
+        * and we always remove from the beginning, so should always
+        * be consistent. */
+       private final List<QueuedAck> ackQueue;
+       /** Serial numbers of packets that we have forgotten. Usually
+        * when we have forgotten a packet it just means that it has 
+        * been shifted to another KeyTracker because this one was
+        * deprecated; the messages will get through in the end.
+        */
+       private final List<QueuedForgotten> forgottenQueue;
+       /** The highest incoming serial number we have ever seen
+        * from the other side. Includes actual packets and resend
+        * requests (provided they are within range). */
+       private int highestSeenIncomingSerialNumber;
+       /** Serial numbers of packets we want to be resent by the
+        * other side to us, the time at which they become sendable,
+        * and the time at which they become urgent. In order of
+        * the latter. */
+       private final UpdatableSortedLinkedListWithForeignIndex 
resendRequestQueue;
+       /** Serial numbers of packets we want to be acknowledged by
+        * the other side, the time at which they become sendable,
+        * and the time at which they become urgent. In order of
+        * the latter. */
+       private final UpdatableSortedLinkedListWithForeignIndex ackRequestQueue;
+       /** Numbered packets that we need to send to the other side
+        * because they asked for them. Just contains the numbers. */
+       private final HashSet<Integer> packetsToResend;
+       /** Ranges of packet numbers we have received from the other
+        * side. */
+       private final ReceivedPacketNumbers packetNumbersReceived;
+       /** Counter to generate the next packet number */
+       private int nextPacketNumber;
+       final long createdTime;
+       /** The time at which we last successfully decoded a packet. */
+       private long timeLastDecodedPacket;
+
+       /** Everything is clear to start with */
+       PacketTracker(PeerNode pn) {
+               this.pn = pn;
+               ackQueue = new LinkedList<QueuedAck>();
+               forgottenQueue = new LinkedList<QueuedForgotten>();
+               highestSeenIncomingSerialNumber = -1;
+               // give some leeway
+               sentPacketsContents = new LimitedRangeIntByteArrayMap(128);
+               resendRequestQueue = new 
UpdatableSortedLinkedListWithForeignIndex();
+               ackRequestQueue = new 
UpdatableSortedLinkedListWithForeignIndex();
+               packetsToResend = new HashSet<Integer>();
+               packetNumbersReceived = new ReceivedPacketNumbers(512);
+               isDeprecated = false;
+               nextPacketNumber = pn.node.random.nextInt(100 * 1000);
+               createdTime = System.currentTimeMillis();
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+       }
+
+       /**
+        * Set the deprecated flag to indicate that we are now
+        * no longer the primary key. And wake up any threads trying to lock
+        * a packet number; they can be sent with the new KT.
+        * 
+        * After this, new packets will not be sent. It will not be possible to 
allocate a new
+        * packet number. However, old resend requests etc may still be sent.
+        */
+       public void deprecated() {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               isDeprecated = true;
+               sentPacketsContents.interrupt();
+       }
+
+       /**
+        * @return The highest received incoming serial number.
+        */
+       public int highestReceivedIncomingSeqNumber() {
+               return this.highestSeenIncomingSerialNumber;
+       }
+
+       /**
+        * Received this packet??
+        */
+       public boolean alreadyReceived(int seqNumber) {
+               return packetNumbersReceived.contains(seqNumber);
+       }
+
+       /** toString() - don't leak the key unless asked to */
+       @Override
+       public String toString() {
+               return super.toString() + " for " + pn.shortToString();
+       }
+
+       /**
+        * Queue an ack to be sent back to the node soon.
+        * @param seqNumber The number of the packet to be acked.
+        */
+       public void queueAck(int seqNumber) {
+               if(logMINOR)
+                       Logger.minor(this, "Queueing ack for " + seqNumber);
+               QueuedAck qa = new QueuedAck(seqNumber);
+               synchronized(ackQueue) {
+                       ackQueue.add(qa);
+               }
+       // Will go urgent in 200ms
+       }
+
+       public void queueForgotten(int seqNumber) {
+               queueForgotten(seqNumber, true);
+       }
+
+       public void queueForgotten(int seqNumber, boolean log) {
+               if(log && ((!isDeprecated) || logMINOR)) {
+                       String msg = "Queueing forgotten for " + seqNumber + " 
for " + this;
+                       if(!isDeprecated)
+                               Logger.error(this, msg);
+                       else
+                               Logger.minor(this, msg);
+               }
+               QueuedForgotten qf = new QueuedForgotten(seqNumber);
+               synchronized(forgottenQueue) {
+                       forgottenQueue.add(qf);
+               }
+       }
+
+       static class PacketActionItem { // anyone got a better name?
+
+               /** Packet sequence number */
+               int packetNumber;
+               /** Time at which this packet's ack or resend request becomes 
urgent
+                * and can trigger an otherwise empty packet to be sent. */
+               long urgentTime;
+
+               @Override
+               public String toString() {
+                       return super.toString() + ": packet " + packetNumber + 
" urgent@" + urgentTime + '(' + (System.currentTimeMillis() - urgentTime) + ')';
+               }
+       }
+
+       private final static class QueuedAck extends PacketActionItem {
+
+               QueuedAck(int packet) {
+                       long now = System.currentTimeMillis();
+                       packetNumber = packet;
+                       /** If not included on a packet in next 200ms, then
+                        * force a send of an otherwise empty packet.
+                        */
+                       urgentTime = now + 200;
+               }
+       }
+
+       // FIXME this is almost identical to QueuedAck, coalesce the classes
+       private final static class QueuedForgotten extends PacketActionItem {
+
+               QueuedForgotten(int packet) {
+                       long now = System.currentTimeMillis();
+                       packetNumber = packet;
+                       /** If not included on a packet in next 500ms, then
+                        * force a send of an otherwise empty packet.
+                        */
+                       urgentTime = now + 500;
+               }
+       }
+
+       private abstract class BaseQueuedResend extends PacketActionItem
+               implements IndexableUpdatableSortedLinkedListItem {
+
+               /** Time at which this item becomes sendable.
+                * When we send a resend request, this is reset to t+500ms.
+                * 
+                * Constraint: urgentTime is always greater than activeTime.
+                */
+               long activeTime;
+
+               void sent() throws UpdatableSortedLinkedListKilledException {
+                       long now = System.currentTimeMillis();
+                       activeTime = now + 500;
+                       urgentTime = activeTime + urgentDelay();
+               // This is only removed when we actually receive the packet
+               // But for now it will sleep
+               }
+
+               BaseQueuedResend(int packetNumber) {
+                       this.packetNumber = packetNumber;
+                       long now = System.currentTimeMillis();
+                       activeTime = initialActiveTime(now);
+                       urgentTime = activeTime + urgentDelay();
+               }
+
+               abstract long urgentDelay();
+
+               abstract long initialActiveTime(long now);
+               private Item next;
+               private Item prev;
+
+               public final Item getNext() {
+                       return next;
+               }
+
+               public final Item setNext(Item i) {
+                       Item old = next;
+                       next = i;
+                       return old;
+               }
+
+               public Item getPrev() {
+                       return prev;
+               }
+
+               public Item setPrev(Item i) {
+                       Item old = prev;
+                       prev = i;
+                       return old;
+               }
+
+               public int compareTo(Object o) {
+                       BaseQueuedResend r = (BaseQueuedResend) o;
+                       if(urgentTime > r.urgentTime)
+                               return 1;
+                       if(urgentTime < r.urgentTime)
+                               return -1;
+                       if(packetNumber > r.packetNumber)
+                               return 1;
+                       if(packetNumber < r.packetNumber)
+                               return -1;
+                       return 0;
+               }
+
+               public Object indexValue() {
+                       return packetNumber;
+               }
+               private DoublyLinkedList parent;
+
+               public DoublyLinkedList getParent() {
+                       return parent;
+               }
+
+               public DoublyLinkedList setParent(DoublyLinkedList l) {
+                       DoublyLinkedList old = parent;
+                       parent = l;
+                       return old;
+               }
+       }
+
+       private class QueuedResendRequest extends BaseQueuedResend {
+
+               @Override
+               long initialActiveTime( long now) {
+                       return now; // Active immediately; reordering is rare
+               }
+
+               QueuedResendRequest(int packetNumber) {
+                       super(packetNumber);
+               }
+
+               @Override
+               void sent() throws UpdatableSortedLinkedListKilledException {
+                       synchronized(resendRequestQueue) {
+                               super.sent();
+                               resendRequestQueue.update(this);
+                       }
+               }
+
+               @Override
+               long urgentDelay() {
+                       return PacketSender.MAX_COALESCING_DELAY; // Urgent 
pretty soon
+               }
+       }
+
+       private class QueuedAckRequest extends BaseQueuedResend {
+
+               final long createdTime;
+               long activeDelay;
+
+               @Override
+               long initialActiveTime( long now) {
+                       // Request an ack after four RTTs
+                       activeDelay = twoRTTs();
+                       return now + activeDelay;
+               }
+
+               QueuedAckRequest(int packetNumber) {
+                       super(packetNumber);
+                       this.createdTime = System.currentTimeMillis();
+               }
+
+               @Override
+               void sent() throws UpdatableSortedLinkedListKilledException {
+                       synchronized(ackRequestQueue) {
+                               super.sent();
+                               ackRequestQueue.update(this);
+                       }
+               }
+
+               /**
+                * Acknowledged.
+                */
+               public void onAcked() {
+                       long t = Math.max(0, System.currentTimeMillis() - 
createdTime);
+                       pn.reportPing(t);
+                       if(logMINOR)
+                               Logger.minor(this, "Reported round-trip time of 
" + TimeUtil.formatTime(t, 2, true) + " on " + pn.getPeer() + " (avg " + 
TimeUtil.formatTime((long) pn.averagePingTime(), 2, true) + ", #" + 
packetNumber + ')');
+               }
+
+               @Override
+               long urgentDelay() {
+                       return PacketSender.MAX_COALESCING_DELAY;
+               }
+       }
+
+       /**
+        * Called when we receive a packet.
+        * @param seqNumber The packet's serial number.
+        * See the comments in FNPPacketMangler.processOutgoing* for
+        * the reason for the locking.
+        */
+       public synchronized void receivedPacket(int seqNumber) {
+               timeLastDecodedPacket = System.currentTimeMillis();
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               if(logMINOR)
+                       Logger.minor(this, "Received packet " + seqNumber + " 
from " + pn.shortToString());
+               if(seqNumber == -1)
+                       return;
+               // FIXME delete this log statement
+               if(logMINOR)
+                       Logger.minor(this, "Still received packet: " + 
seqNumber);
+               // Received packet
+               receivedPacketNumber(seqNumber);
+               // Ack it even if it is a resend
+               queueAck(seqNumber);
+       }
+
+       // TCP uses four RTTs with no ack to resend ... but we have a more 
drawn out protocol, we
+       // should use only two.
+       public long twoRTTs() {
+               // FIXME upper bound necessary ?
+               return (long) Math.min(Math.max(250, pn.averagePingTime() * 2), 
2500);
+       }
+
+       protected void receivedPacketNumber(int seqNumber) {
+               if(logMINOR)
+                       Logger.minor(this, "Handling received packet number " + 
seqNumber);
+               queueResendRequests(seqNumber);
+               packetNumbersReceived.got(seqNumber);
+               try {
+                       removeResendRequest(seqNumber);
+               } catch(UpdatableSortedLinkedListKilledException e) {
+                       // Ignore, not our problem
+               }
+               synchronized(this) {
+                       highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, seqNumber);
+               }
+               if(logMINOR)
+                       Logger.minor(this, "Handled received packet number " + 
seqNumber);
+       }
+
+       /**
+        * Remove a resend request from the queue.
+        * @param seqNumber
+        * @throws UpdatableSortedLinkedListKilledException 
+        */
+       private void removeResendRequest(int seqNumber) throws 
UpdatableSortedLinkedListKilledException {
+               synchronized(resendRequestQueue) {
+                       resendRequestQueue.removeByKey(seqNumber);
+               }
+       }
+
+       /**
+        * Add some resend requests if necessary.
+        * @param seqNumber The number of the packet we just received.
+        */
+       private void queueResendRequests(int seqNumber) {
+               int max;
+               synchronized(this) {
+                       max = packetNumbersReceived.highest();
+               }
+               if(seqNumber > max)
+                       try {
+                               if((max != -1) && (seqNumber - max > 1)) {
+                                       if(logMINOR)
+                                               Logger.minor(this, "Queueing 
resends from " + max + " to " + seqNumber);
+                                       // Missed some packets out
+                                       for(int i = max + 1; i < seqNumber; 
i++) {
+                                               queueResendRequest(i);
+                                       }
+                               }
+                       } catch(UpdatableSortedLinkedListKilledException e) {
+                               // Ignore (we are decoding packet, not sending 
one)
+                       }
+       }
+
+       /**
+        * Queue a resend request
+        * @param packetNumber the packet serial number to queue a
+        * resend request for
+        * @throws UpdatableSortedLinkedListKilledException 
+        */
+       private void queueResendRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
+               synchronized(resendRequestQueue) {
+                       if(queuedResendRequest(packetNumber)) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Not queueing resend 
request for " + packetNumber + " - already queued");
+                               return;
+                       }
+                       if(logMINOR)
+                               Logger.minor(this, "Queueing resend request for 
" + packetNumber);
+                       QueuedResendRequest qrr = new 
QueuedResendRequest(packetNumber);
+                       resendRequestQueue.add(qrr);
+               }
+       }
+
+       /**
+        * Queue an ack request
+        * @param packetNumber the packet serial number to queue a
+        * resend request for
+        * @throws UpdatableSortedLinkedListKilledException 
+        */
+       private void queueAckRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
+               synchronized(ackRequestQueue) {
+                       // FIXME should we just remove the existing ack 
request? If we do, we get a better
+                       // estimate of RTT on lossy links... if we don't, lossy 
links will include the average
+                       // time to send a packet including all resends. The 
latter may be useful, and in fact
+                       // the former is unreliable...
+                       if(queuedAckRequest(packetNumber)) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Not queueing ack 
request for " + packetNumber + " - already queued");
+                               return;
+                       }
+                       if(logMINOR)
+                               Logger.minor(this, "Queueing ack request for " 
+ packetNumber + " on " + this);
+                       QueuedAckRequest qrr = new 
QueuedAckRequest(packetNumber);
+                       ackRequestQueue.add(qrr);
+               }
+       }
+
+       /**
+        * Is an ack request queued for this packet number?
+        */
+       private boolean queuedAckRequest(int packetNumber) {
+               synchronized(ackRequestQueue) {
+                       return ackRequestQueue.containsKey(packetNumber);
+               }
+       }
+
+       /**
+        * Is a resend request queued for this packet number?
+        */
+       private boolean queuedResendRequest(int packetNumber) {
+               synchronized(resendRequestQueue) {
+                       return resendRequestQueue.containsKey(packetNumber);
+               }
+       }
+
+       /**
+        * Called when we have received several packet acknowledgements.
+        * Synchronized for the same reason as the sender code is:
+        * So that we don't end up sending packets too late when overloaded,
+        * and get horrible problems such as asking to resend packets which
+        * haven't been sent yet.
+        */
+       public synchronized void acknowledgedPackets(int[] seqNos) {
+               AsyncMessageCallback[][] callbacks = new 
AsyncMessageCallback[seqNos.length][];
+               for(int i = 0; i < seqNos.length; i++) {
+                       int realSeqNo = seqNos[i];
+                       if(logMINOR)
+                               Logger.minor(this, "Acknowledged packet: " + 
realSeqNo);
+                       try {
+                               removeAckRequest(realSeqNo);
+                       } catch(UpdatableSortedLinkedListKilledException e) {
+                               // Ignore, we are processing an incoming packet
+                       }
+                       if(logMINOR)
+                               Logger.minor(this, "Removed ack request");
+                       callbacks[i] = 
sentPacketsContents.getCallbacks(realSeqNo);
+                       byte[] buf = sentPacketsContents.get(realSeqNo);
+                       long timeAdded = sentPacketsContents.getTime(realSeqNo);
+                       if(sentPacketsContents.remove(realSeqNo))
+                               if(buf.length > Node.PACKET_SIZE) {
+                                       PacketThrottle throttle = 
pn.getThrottle();
+                                       throttle.notifyOfPacketAcknowledged();
+                                       
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
+                               }
+               }
+               int cbCount = 0;
+               for(int i = 0; i < callbacks.length; i++) {
+                       AsyncMessageCallback[] cbs = callbacks[i];
+                       if(cbs != null)
+                               for(int j = 0; j < cbs.length; j++) {
+                                       cbs[j].acknowledged();
+                                       cbCount++;
+                               }
+               }
+               if(cbCount > 0 && logMINOR)
+                       Logger.minor(this, "Executed " + cbCount + " 
callbacks");
+               try {
+                       wouldBlock(true);
+               } catch(BlockedTooLongException e) {
+                       // Ignore, will come up again. In any case it's rather 
unlikely...
+               }
+       }
+
+       /**
+        * Called when we have received a packet acknowledgement.
+        * @param realSeqNo
+        */
+       public void acknowledgedPacket(int realSeqNo) {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               AsyncMessageCallback[] callbacks;
+               if(logMINOR)
+                       Logger.minor(this, "Acknowledged packet: " + realSeqNo);
+               try {
+                       synchronized(this) {
+                               removeAckRequest(realSeqNo);
+                       }
+               } catch(UpdatableSortedLinkedListKilledException e) {
+                       // Ignore, we are processing an incoming packet
+               }
+               if(logMINOR)
+                       Logger.minor(this, "Removed ack request");
+               callbacks = sentPacketsContents.getCallbacks(realSeqNo);
+               byte[] buf = sentPacketsContents.get(realSeqNo);
+               long timeAdded = sentPacketsContents.getTime(realSeqNo);
+               if(sentPacketsContents.remove(realSeqNo))
+                       if(buf.length > Node.PACKET_SIZE) {
+                               PacketThrottle throttle = pn.getThrottle();
+                               throttle.notifyOfPacketAcknowledged();
+                               
throttle.setRoundTripTime(System.currentTimeMillis() - timeAdded);
+                       }
+               try {
+                       wouldBlock(true);
+               } catch(BlockedTooLongException e) {
+                       // Ignore, will come up again. In any case it's rather 
unlikely...
+               }
+               if(callbacks != null) {
+                       for(int i = 0; i < callbacks.length; i++)
+                               callbacks[i].acknowledged();
+                       if(logMINOR)
+                               Logger.minor(this, "Executed " + 
callbacks.length + " callbacks");
+               }
+       }
+
+       /**
+        * Remove an ack request from the queue by packet number.
+        * @throws UpdatableSortedLinkedListKilledException 
+        */
+       private void removeAckRequest(int seqNo) throws 
UpdatableSortedLinkedListKilledException {
+               QueuedAckRequest qr = null;
+
+               synchronized(ackRequestQueue) {
+                       qr = (QueuedAckRequest) 
ackRequestQueue.removeByKey(seqNo);
+               }
+               if(qr != null)
+                       qr.onAcked();
+               else
+                       Logger.normal(this, "Removing ack request twice? Null 
on " + seqNo + " from " + pn.getPeer() + " (" + TimeUtil.formatTime((int) 
pn.averagePingTime(), 2, true) + " ping avg)");
+       }
+
+       /**
+        * Resend (off-thread but ASAP) a packet.
+        * @param seqNumber The serial number of the packet to be
+        * resent.
+        */
+       public void resendPacket(int seqNumber) {
+               byte[] resendData = sentPacketsContents.get(seqNumber);
+               if(resendData != null) {
+                       if(resendData.length > Node.PACKET_SIZE)
+                               pn.getThrottle().notifyOfPacketLost();
+                       synchronized(packetsToResend) {
+                               packetsToResend.add(seqNumber);
+                       }
+                       pn.node.ps.wakeUp();
+               } else {
+                       String msg = "Asking me to resend packet " + seqNumber +
+                               " which we haven't sent yet or which they have 
already acked (next=" + nextPacketNumber + ')';
+                       // Might just be late, but could indicate something 
serious.
+                       if(isDeprecated) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Other side wants us 
to resend packet " + seqNumber + " for " + this + " - we cannot do this because 
we are deprecated");
+                       } else
+                               Logger.normal(this, msg);
+               }
+       }
+
+       /**
+        * Called when we receive an AckRequest.
+        * @param packetNumber The packet that the other side wants
+        * us to re-ack.
+        */
+       public synchronized void receivedAckRequest(int packetNumber) {
+               if(queuedAck(packetNumber)) {
+                       // Already going to send an ack
+                       // Don't speed it up though; wasteful
+               } else if(packetNumbersReceived.contains(packetNumber))
+                       // We have received it, so send them an ack
+                       queueAck(packetNumber);
+               else {
+                       // We have not received it, so get them to resend it
+                       try {
+                               queueResendRequest(packetNumber);
+                       } catch(UpdatableSortedLinkedListKilledException e) {
+                               // Ignore, we are decoding, not sending.
+                               }
+                       highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, packetNumber);
+               }
+       }
+
+       /**
+        * Is there a queued ack with the given packet number?
+        * FIXME: have a hashtable? The others do, but probably it
+        * isn't necessary. We should be consistent about this -
+        * either take it out of UpdatableSortedLinkedListWithForeignIndex,
+        * or add one here.
+        */
+       private boolean queuedAck(int packetNumber) {
+               synchronized(ackQueue) {
+                       for(QueuedAck qa : ackQueue) {
+                               if(qa.packetNumber == packetNumber)
+                                       return true;
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * Destination forgot a packet.
+        * This is normal if we are the secondary key.
+        * @param seqNumber The packet number lost.
+        */
+       public void destForgotPacket(int seqNumber) {
+               if(isDeprecated)
+                       Logger.normal(this, "Destination forgot packet: " + 
seqNumber);
+               else
+                       Logger.error(this, "Destination forgot packet: " + 
seqNumber);
+               synchronized(this) {
+                       try {
+                               removeResendRequest(seqNumber);
+                       } catch(UpdatableSortedLinkedListKilledException e) {
+                               // Ignore
+                       }
+               }
+       }
+
+       /**
+        * @return A packet number for a new outgoing packet.
+        * This method will block until one is available if
+        * necessary.
+        * @throws KeyChangedException if the thread is interrupted when waiting
+        */
+       public int allocateOutgoingPacketNumber() throws KeyChangedException, 
NotConnectedException {
+               int packetNumber;
+               if(!pn.isConnected())
+                       throw new NotConnectedException();
+               synchronized(this) {
+                       if(isDeprecated)
+                               throw new KeyChangedException();
+                       packetNumber = nextPacketNumber++;
+                       if(logMINOR)
+                               Logger.minor(this, "Allocated " + packetNumber 
+ " in allocateOutgoingPacketNumber for " + this);
+               }
+               while(true) {
+                       try {
+                               sentPacketsContents.lock(packetNumber);
+                               if(logMINOR)
+                                       Logger.minor(this, "Locked " + 
packetNumber);
+                               synchronized(this) {
+                                       timeWouldBlock = -1;
+                               }
+                               return packetNumber;
+                       } catch(InterruptedException e) {
+                               synchronized(this) {
+                                       if(isDeprecated)
+                                               throw new KeyChangedException();
+                               }
+                       }
+               }
+       }
+       private long timeWouldBlock = -1;
+       static final long MAX_WOULD_BLOCK_DELTA = 10 * 60 * 1000;
+
+       public boolean wouldBlock(boolean wakeTicker) throws 
BlockedTooLongException {
+               long now = System.currentTimeMillis();
+               synchronized(this) {
+                       if(sentPacketsContents.wouldBlock(nextPacketNumber)) {
+                               if(timeWouldBlock == -1)
+                                       timeWouldBlock = now;
+                               else {
+                                       long delta = now - timeWouldBlock;
+                                       if(delta > MAX_WOULD_BLOCK_DELTA) {
+                                               Logger.error(this, "Not been 
able to allocate a packet to tracker " + this + " for " + 
TimeUtil.formatTime(delta, 3, true));
+                                               throw new 
BlockedTooLongException(this, delta);
+                                       }
+                               }
+                               return true;
+                       } else
+                               if(timeWouldBlock != -1) {
+                                       long delta = now - timeWouldBlock;
+                                       timeWouldBlock = -1;
+                                       if(delta > 
PacketSender.MAX_COALESCING_DELAY)
+                                               Logger.error(this, "Waking 
PacketSender: have been blocking for packet ack for " + 
TimeUtil.formatTime(delta, 3, true));
+                                       else
+                                               return false;
+                               } else
+                                       return false;
+               }
+               pn.node.ps.wakeUp();
+               return false;
+       }
+
+       /**
+        * @return A packet number for a new outgoing packet.
+        * This method will not block, and will throw an exception
+        * if it would need to block.
+        * @throws KeyChangedException if the thread is interrupted when waiting
+        */
+       public int allocateOutgoingPacketNumberNeverBlock() throws 
KeyChangedException, NotConnectedException, WouldBlockException {
+               int packetNumber;
+               if(!pn.isConnected())
+                       throw new NotConnectedException();
+               synchronized(this) {
+                       packetNumber = nextPacketNumber;
+                       if(isDeprecated)
+                               throw new KeyChangedException();
+                       sentPacketsContents.lockNeverBlock(packetNumber);
+                       timeWouldBlock = -1;
+                       nextPacketNumber = packetNumber + 1;
+                       if(logMINOR)
+                               Logger.minor(this, "Allocated " + packetNumber 
+ " in allocateOutgoingPacketNumberNeverBlock for " + this);
+                       return packetNumber;
+               }
+       }
+
+       public int[] grabForgotten() {
+               if(logMINOR)
+                       Logger.minor(this, "Grabbing forgotten packet numbers");
+               int[] acks;
+               synchronized(forgottenQueue) {
+                       // Grab the acks and tell them they are sent
+                       int length = forgottenQueue.size();
+                       acks = new int[length];
+                       int i = 0;
+
+                       Iterator it = forgottenQueue.iterator();
+                       while(it.hasNext()) {
+                               QueuedForgotten ack = (QueuedForgotten) 
it.next();
+                               acks[i++] = ack.packetNumber;
+                               if(logMINOR)
+                                       Logger.minor(this, "Grabbing ack " + 
ack.packetNumber + " from " + this);
+                               it.remove();    // sent
+                       }
+               }
+               return acks;
+       }
+
+       public void requeueForgot(int[] forgotPackets, int start, int length) {
+               synchronized(forgottenQueue) { // It doesn't do anything else 
does it? REDFLAG
+                       for(int i = start; i < start + length; i++) {
+                               queueForgotten(i, false);
+                       }
+               }
+       }
+
+       /**
+        * Grab all the currently queued acks to be sent to this node.
+        * @return An array of packet numbers that we need to acknowledge.
+        */
+       public int[] grabAcks() {
+               if(logMINOR)
+                       Logger.minor(this, "Grabbing acks");
+               int[] acks;
+               synchronized(ackQueue) {
+                       // Grab the acks and tell them they are sent
+                       int length = ackQueue.size();
+                       acks = new int[length];
+                       int i = 0;
+                       Iterator<QueuedAck> it = ackQueue.iterator();
+                       while(it.hasNext()) {
+                               QueuedAck ack = it.next();
+                               acks[i++] = ack.packetNumber;
+                               if(logMINOR)
+                                       Logger.minor(this, "Grabbing ack " + 
ack.packetNumber + " from " + this);
+                               it.remove();    // sent
+                       }
+               }
+               return acks;
+       }
+
+       /**
+        * Grab all the currently queued resend requests.
+        * @return An array of the packet numbers of all the packets we want to 
be resent.
+        * @throws NotConnectedException If the peer is no longer connected.
+        */
+       public int[] grabResendRequests() throws NotConnectedException {
+               UpdatableSortedLinkedListItem[] items;
+               int[] packetNumbers;
+               int realLength;
+               long now = System.currentTimeMillis();
+               try {
+                       synchronized(resendRequestQueue) {
+                               items = resendRequestQueue.toArray();
+                               int length = items.length;
+                               packetNumbers = new int[length];
+                               realLength = 0;
+                               for(int i = 0; i < length; i++) {
+                                       QueuedResendRequest qrr = 
(QueuedResendRequest) items[i];
+                                       
if(packetNumbersReceived.contains(qrr.packetNumber)) {
+                                               if(logMINOR)
+                                                       Logger.minor(this, 
"Have already seen " + qrr.packetNumber + ", removing from resend list");
+                                               resendRequestQueue.remove(qrr);
+                                               continue;
+                                       }
+                                       if(qrr.activeTime <= now) {
+                                               packetNumbers[realLength++] = 
qrr.packetNumber;
+                                               if(logMINOR)
+                                                       Logger.minor(this, 
"Grabbing resend request: " + qrr.packetNumber + " from " + this);
+                                               qrr.sent();
+                                       } else if(logMINOR)
+                                               Logger.minor(this, "Rejecting 
resend request: " + qrr.packetNumber + " - in future by " + (qrr.activeTime - 
now) + "ms for " + this);
+                               }
+                       }
+               } catch(UpdatableSortedLinkedListKilledException e) {
+                       throw new NotConnectedException();
+               }
+               int[] trimmedPacketNumbers = new int[realLength];
+               System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
+               return trimmedPacketNumbers;
+       }
+
+       public int[] grabAckRequests() throws NotConnectedException, 
StillNotAckedException {
+               UpdatableSortedLinkedListItem[] items;
+               int[] packetNumbers;
+               int realLength;
+               if(logMINOR)
+                       Logger.minor(this, "Grabbing ack requests");
+               try {
+                       synchronized(ackRequestQueue) {
+                               long now = System.currentTimeMillis();
+                               items = ackRequestQueue.toArray();
+                               int length = items.length;
+                               packetNumbers = new int[length];
+                               realLength = 0;
+                               for(int i = 0; i < length; i++) {
+                                       QueuedAckRequest qr = 
(QueuedAckRequest) items[i];
+                                       int packetNumber = qr.packetNumber;
+                                       if(qr.activeTime <= now) {
+                                               
if(sentPacketsContents.get(packetNumber) == null) {
+                                                       if(logMINOR)
+                                                               
Logger.minor(this, "Asking to ack packet which has already been acked: " + 
packetNumber + " on " + this + ".grabAckRequests");
+                                                       
ackRequestQueue.remove(qr);
+                                                       continue;
+                                               }
+                                               if(now - qr.createdTime > 2 * 
60 * 1000) {
+                                                       if(logMINOR)
+                                                               
Logger.minor(this, "Packet " + qr.packetNumber + " sent over " + (now - 
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
+                                                       if(now - qr.createdTime 
> 10 * 60 * 1000) {
+                                                               
Logger.error(this, "Packet " + qr.packetNumber + " sent over " + (now - 
qr.createdTime) + "ms ago and still not acked on " + this + " for " + pn);
+                                                               throw new 
StillNotAckedException(this);
+                                                       }
+                                               }
+                                               packetNumbers[realLength++] = 
packetNumber;
+                                               if(logMINOR)
+                                                       Logger.minor(this, 
"Grabbing ack request " + packetNumber + " (" + realLength + ") from " + this);
+                                               qr.sent();
+                                       } else if(logMINOR)
+                                               Logger.minor(this, "Ignoring 
ack request " + packetNumber + " (" + realLength + ") - will become active in " 
+ (qr.activeTime - now) + "ms on " + this + " - " + qr);
+                               }
+                       }
+               } catch(UpdatableSortedLinkedListKilledException e) {
+                       throw new NotConnectedException();
+               }
+               if(logMINOR)
+                       Logger.minor(this, "realLength now " + realLength);
+               int[] trimmedPacketNumbers = new int[realLength];
+               System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
+               if(logMINOR)
+                       Logger.minor(this, "Returning " + 
trimmedPacketNumbers.length + " ackRequests");
+               return trimmedPacketNumbers;
+       }
+
+       /**
+        * @return The time at which we will have to send some
+        * notifications. Or Long.MAX_VALUE if there are none to send.
+        */
+       public long getNextUrgentTime() {
+               long earliestTime = Long.MAX_VALUE;
+               synchronized(ackQueue) {
+                       if(!ackQueue.isEmpty()) {
+                               QueuedAck qa = ackQueue.get(0);
+                               earliestTime = qa.urgentTime;
+                       }
+               }
+               synchronized(resendRequestQueue) {
+                       if(!resendRequestQueue.isEmpty()) {
+                               QueuedResendRequest qr = (QueuedResendRequest) 
resendRequestQueue.getLowest();
+                               earliestTime = Math.min(earliestTime, 
qr.urgentTime);
+                       }
+               }
+               synchronized(ackRequestQueue) {
+                       if(!ackRequestQueue.isEmpty()) {
+                               QueuedAckRequest qr = (QueuedAckRequest) 
ackRequestQueue.getLowest();
+                               earliestTime = Math.min(earliestTime, 
qr.urgentTime);
+                       }
+               }
+               return earliestTime;
+       }
+
+       /**
+        * @return The last sent new packet number.
+        */
+       public int getLastOutgoingSeqNumber() {
+               synchronized(this) {
+                       return nextPacketNumber - 1;
+               }
+       }
+
+       /**
+        * Report a packet has been sent
+        * @param data The data we have just sent (payload only, decrypted). 
+        * @param seqNumber The packet number.
+        * @throws NotConnectedException 
+        */
+       public void sentPacket(byte[] data, int seqNumber, 
AsyncMessageCallback[] callbacks, short priority) throws NotConnectedException {
+               if(callbacks != null)
+                       for(int i = 0; i < callbacks.length; i++) {
+                               if(callbacks[i] == null)
+                                       throw new NullPointerException();
+                       }
+               sentPacketsContents.add(seqNumber, data, callbacks, priority);
+               try {
+                       queueAckRequest(seqNumber);
+               } catch(UpdatableSortedLinkedListKilledException e) {
+                       throw new NotConnectedException();
+               }
+       }
+
+       /**
+        * Clear the KeyTracker. Depreciate it, clear all resend, ack, 
request-ack etc queues.
+        * Return the messages we still had in flight. The caller will then 
either add them to
+        * another KeyTracker, or call their callbacks to indicate failure.
+        */
+       private LimitedRangeIntByteArrayMapElement[] clear() {
+               if(logMINOR)
+                       Logger.minor(this, "Clearing " + this);
+               isDeprecated = true;
+               LimitedRangeIntByteArrayMapElement[] elements;
+               synchronized(sentPacketsContents) {
+                       elements = sentPacketsContents.grabAll(); // will clear
+               }
+               synchronized(ackQueue) {
+                       ackQueue.clear();
+               }
+               synchronized(resendRequestQueue) {
+                       resendRequestQueue.kill();
+               }
+               synchronized(ackRequestQueue) {
+                       ackRequestQueue.kill();
+               }
+               synchronized(packetsToResend) {
+                       packetsToResend.clear();
+               }
+               packetNumbersReceived.clear();
+               return elements;
+       }
+
+       /**
+        * Completely deprecate the KeyTracker, in favour of a new one. 
+        * It will no longer be used for anything. The KeyTracker will be 
cleared and all outstanding packets
+        * moved to the new KeyTracker.
+        * 
+        * *** Must only be called if the KeyTracker is not to be kept. 
Otherwise, we may receive some packets twice. ***
+        */
+       public void completelyDeprecated(KeyTracker newTracker) {
+               if(logMINOR)
+                       Logger.minor(this, "Completely deprecated: " + this + " 
in favour of " + newTracker);
+               LimitedRangeIntByteArrayMapElement[] elements = clear();
+               if(elements.length == 0)
+                       return; // nothing more to do
+               MessageItem[] messages = new MessageItem[elements.length];
+               for(int i = 0; i < elements.length; i++) {
+                       LimitedRangeIntByteArrayMapElement element = 
elements[i];
+                       byte[] buf = element.data;
+                       AsyncMessageCallback[] callbacks = element.callbacks;
+                       // Ignore packet#
+                       if(logMINOR)
+                               Logger.minor(this, "Queueing resend of what was 
once " + element.packetNumber);
+                       messages[i] = new MessageItem(buf, callbacks, true, 
pn.resendByteCounter, element.priority);
+               }
+               pn.requeueMessageItems(messages, 0, messages.length, true);
+
+               pn.node.ps.wakeUp();
+       }
+
+       /**
+        * Called when the node appears to have been disconnected.
+        * Dump all sent messages.
+        */
+       public void disconnected() {
+               // Clear everything, call the callbacks
+               LimitedRangeIntByteArrayMapElement[] elements = clear();
+               for(int i = 0; i < elements.length; i++) {
+                       LimitedRangeIntByteArrayMapElement element = 
elements[i];
+                       AsyncMessageCallback[] callbacks = element.callbacks;
+                       if(callbacks != null)
+                               for(int j = 0; j < callbacks.length; j++)
+                                       callbacks[j].disconnected();
+               }
+       }
+
+       /**
+        * Fill rpiTemp with ResendPacketItems of packets that need to be
+        * resent.
+        * @return An array of integers which contains the packet numbers
+        * to be resent (the RPI's are put into rpiTemp), or null if there
+        * are no packets to resend.
+        * 
+        * Not a very nice API, but it saves a load of allocations, and at
+        * least it's documented!
+        */
+       public int[] grabResendPackets(Vector<ResendPacketItem> rpiTemp, int[] 
numbers) {
+               rpiTemp.clear();
+               long now = System.currentTimeMillis();
+               long fourRTTs = twoRTTs();
+               int count = 0;
+               synchronized(packetsToResend) {
+                       int len = packetsToResend.size();
+                       if(numbers.length < len)
+                               numbers = new int[len * 2];
+                       for(Iterator<Integer> it = packetsToResend.iterator(); 
it.hasNext();) {
+                               int packetNo = it.next();
+                               long resentTime = 
sentPacketsContents.getReaddedTime(packetNo);
+                               if(now - resentTime > fourRTTs) {
+                                       // Either never resent, or resent at 
least 4 RTTs ago
+                                       numbers[count++] = packetNo;
+                                       it.remove();
+                               }
+                       }
+                       packetsToResend.clear();
+               }
+               for(int i = 0; i < count; i++) {
+                       int packetNo = numbers[i];
+                       byte[] buf = sentPacketsContents.get(packetNo);
+                       if(buf == null) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Contents null for " 
+ packetNo + " in grabResendPackets on " + this);
+                               continue; // acked already?
+                       }
+                       AsyncMessageCallback[] callbacks = 
sentPacketsContents.getCallbacks(packetNo);
+                       short priority = 
sentPacketsContents.getPriority(packetNo, DMT.PRIORITY_BULK_DATA);
+                       rpiTemp.add(new ResendPacketItem(buf, packetNo, this, 
callbacks, priority));
+               }
+               if(rpiTemp.isEmpty())
+                       return null;
+               return numbers;
+       }
+
+       public boolean hasPacketsToResend() {
+               synchronized(packetsToResend) {
+                       return !packetsToResend.isEmpty();
+               }
+       }
+
+       public boolean isDeprecated() {
+               return this.isDeprecated;
+       }
+
+       public int countAckRequests() {
+               synchronized(ackRequestQueue) {
+                       return ackRequestQueue.size();
+               }
+       }
+
+       public int countResendRequests() {
+               synchronized(resendRequestQueue) {
+                       return resendRequestQueue.size();
+               }
+       }
+
+       public int countAcks() {
+               synchronized(ackQueue) {
+                       return ackQueue.size();
+               }
+       }
+
+       public synchronized long timeLastDecodedPacket() {
+               return timeLastDecodedPacket;
+       }
+}

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to