Author: toad
Date: 2006-12-21 23:12:24 +0000 (Thu, 21 Dec 2006)
New Revision: 11508

Modified:
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
Log:
If a deprecated KeyTracker (whose sentPacketsContents has been cleared and the 
messages transferred to the new one) gets a resend request, send a forgotten 
packet notification. This will cause the sender to not send us any more resend 
requests.


Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-12-21 
19:54:38 UTC (rev 11507)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-12-21 
23:12:24 UTC (rev 11508)
@@ -1201,7 +1201,7 @@

         // We do not support forgotten packets at present

-        int[] acks, resendRequests, ackRequests;
+        int[] acks, resendRequests, ackRequests, forgotPackets;
        int seqNumber;
         /* Locking:
          * Avoid allocating a packet number, then a long pause due to 
@@ -1235,6 +1235,7 @@

                synchronized(tracker) {
                acks = tracker.grabAcks();
+               forgotPackets = tracker.grabForgotten();
                resendRequests = tracker.grabResendRequests();
                ackRequests = tracker.grabAckRequests();
             realSeqNumber = tracker.getLastOutgoingSeqNumber();
@@ -1326,9 +1327,42 @@
             plaintext[ptr++] = (byte)offsetSeq;
         }

-        // No forgotten packets
-        plaintext[ptr++] = 0;
-
+        byte[] forgotOffsets = null;
+        int forgotCount = 0;
+        
+        if(forgotPackets.length > 0) {
+               for(int i=0;i<forgotPackets.length;i++) {
+                       int seq = forgotPackets[i];
+                       if(logMINOR) Logger.minor(this, "Forgot packet "+i+": 
"+seq);
+                       int offsetSeq = realSeqNumber - seq;
+                       if((offsetSeq > 255) || (offsetSeq < 0)) {
+                               if(tracker.isDeprecated()) {
+                                       // Oh well
+                                       Logger.error(this, "Dropping 
forgot-packet notification on deprecated tracker: "+seq+" on "+tracker+" - real 
seq="+realSeqNumber);
+                                       // Ignore it
+                                       continue;
+                               } else {
+                                       Logger.error(this, "bad forgot packet 
offset: "+offsetSeq+
+                                " - forgotSeq="+seq+", 
packetNumber="+realSeqNumber+" talking to "+tracker.pn.getPeer(), new 
Exception("error"));
+                               }
+                       } else {
+                               if(forgotOffsets == null)
+                                       forgotOffsets = new 
byte[forgotPackets.length - i];
+                               forgotOffsets[i] = (byte) offsetSeq;
+                               forgotCount++;
+                               if(forgotCount == 256)
+                                       tracker.requeueForgot(forgotPackets, 
forgotCount, forgotPackets.length - forgotCount);
+                       }
+               }
+        }
+        
+        plaintext[ptr++] = (byte) forgotCount;
+        
+        if(forgotOffsets != null) {
+               System.arraycopy(forgotOffsets, 0, plaintext, ptr, forgotCount);
+               ptr += forgotCount;
+        }
+        
         System.arraycopy(buf, offset, plaintext, ptr, length);
         ptr += length;

@@ -1345,10 +1379,10 @@
             tracker.sentPacket(saveable, seqNumber, callbacks);
         }

-        if(logMINOR) Logger.minor(this, "Sending...");
+        if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);

         processOutgoingFullyFormatted(plaintext, tracker, callbacks, 
alreadyReportedBytes);
-        if(logMINOR) Logger.minor(this, "Sent packet");
+        if(logMINOR) Logger.minor(this, "Sent packet "+seqNumber);
     }

     /**

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2006-12-21 19:54:38 UTC 
(rev 11507)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2006-12-21 23:12:24 UTC 
(rev 11508)
@@ -58,6 +58,13 @@
      * be consistent. */
     private final DoublyLinkedList ackQueue;

+    /** Serial numbers of packets that we have forgotten. Usually
+     * when we have forgotten a packet it just means that it has 
+     * been shifted to another KeyTracker because this one was
+     * deprecated; the messages will get through in the end.
+     */
+    private final DoublyLinkedList forgottenQueue;
+    
     /** The highest incoming serial number we have ever seen
      * from the other side. Includes actual packets and resend
      * requests (provided they are within range). */
@@ -92,6 +99,7 @@
         this.sessionCipher = cipher;
         this.sessionKey = sessionKey;
         ackQueue = new DoublyLinkedListImpl();
+        forgottenQueue = new DoublyLinkedListImpl();
         highestSeenIncomingSerialNumber = -1;
         // give some leeway
         sentPacketsContents = new LimitedRangeIntByteArrayMap(128);
@@ -150,6 +158,22 @@
         // Will go urgent in 200ms
     }

+    public void queueForgotten(int seqNumber) {
+       queueForgotten(seqNumber, true);
+    }
+    
+    public void queueForgotten(int seqNumber, boolean log) {
+       if(log && ((!isDeprecated) || logMINOR)) {
+               String msg = "Queueing forgotten for "+seqNumber+" for "+this;
+               if(!isDeprecated) Logger.error(this, msg);
+               else Logger.minor(this, msg);
+       }
+       QueuedForgotten qf = new QueuedForgotten(seqNumber);
+       synchronized(forgottenQueue) {
+               forgottenQueue.push(qf);
+       }
+    }
+    
     class PacketActionItem { // anyone got a better name?
         /** Packet sequence number */
         int packetNumber;
@@ -214,6 +238,59 @@
                }
     }

+    // FIXME this is almost identical to QueuedAck, coalesce the classes
+    class QueuedForgotten extends PacketActionItem implements 
DoublyLinkedList.Item {
+        void sent() {
+            synchronized(forgottenQueue) {
+                forgottenQueue.remove(this);
+            }
+        }
+        
+        QueuedForgotten(int packet) {
+            long now = System.currentTimeMillis();
+            packetNumber = packet;
+            /** If not included on a packet in next 500ms, then
+             * force a send of an otherwise empty packet.
+             */
+            urgentTime = now + 500;
+        }
+
+        Item prev;
+        Item next;
+        
+        public Item getNext() {
+            return next;
+        }
+
+        public Item setNext(Item i) {
+            Item old = next;
+            next = i;
+            return old;
+        }
+
+        public Item getPrev() {
+            return prev;
+        }
+
+        public Item setPrev(Item i) {
+            Item old = prev;
+            prev = i;
+            return old;
+        }
+
+        private DoublyLinkedList parent;
+        
+               public DoublyLinkedList getParent() {
+                       return parent;
+               }
+
+               public DoublyLinkedList setParent(DoublyLinkedList l) {
+                       DoublyLinkedList old = parent;
+                       parent = l;
+                       return old;
+               }
+    }
+
     private abstract class BaseQueuedResend extends PacketActionItem 
implements IndexableUpdatableSortedLinkedListItem {
         /** Time at which this item becomes sendable.
          * When we send a resend request, this is reset to t+500ms.
@@ -587,7 +664,11 @@
                        String msg = "Asking me to resend packet "+seqNumber+
                                " which we haven't sent yet or which they have 
already acked (next="+nextPacketNumber+ ')';
                        // Might just be late, but could indicate something 
serious.
-                       if(logMINOR) Logger.error(this, msg);
+                       if(isDeprecated) {
+                               Logger.minor(this, "Other side wants us to 
resend packet "+seqNumber+" for "+this+" - we cannot do this because we are 
deprecated");
+                       } else {
+                               Logger.normal(this, msg);
+                       }
                }
         }
     }
@@ -697,6 +778,33 @@
         }
     }

+    public int[] grabForgotten() {
+       if(logMINOR) Logger.minor(this, "Grabbing forgotten packet numbers");
+        int[] acks;
+        synchronized(forgottenQueue) {
+            // Grab the acks and tell them they are sent
+            int length = forgottenQueue.size();
+            acks = new int[length];
+            int i=0;
+            for(Enumeration e=forgottenQueue.elements();e.hasMoreElements();) {
+                QueuedForgotten ack = (QueuedForgotten)e.nextElement();
+                acks[i++] = ack.packetNumber;
+                if(logMINOR) Logger.minor(this, "Grabbing ack 
"+ack.packetNumber+" from "+this);
+                ack.sent();
+            }
+        }
+        return acks;
+    }
+
+       public void requeueForgot(int[] forgotPackets, int start, int length) {
+               synchronized(forgottenQueue) { // It doesn't do anything else 
does it? REDFLAG
+                       for(int i=start;i<start+length;i++) {
+                               queueForgotten(i, false);
+                       }
+               }
+       }
+
+    
     /**
      * Grab all the currently queued acks to be sent to this node.
      * @return An array of packet numbers that we need to acknowledge.
@@ -977,4 +1085,5 @@
                        return ackQueue.size();
                }
        }
+
 }


Reply via email to