Author: toad
Date: 2008-09-19 16:53:54 +0000 (Fri, 19 Sep 2008)
New Revision: 22707

Modified:
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Send one packet, and then go around the loop again.
If nobody else needs a packet, then send another packet.
This should be much fairer when one node has a severe backlog, or when sending 
a packet is taking ages for some reason.
And it fits well with PacketSender-level throttling (coming soon).


Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-09-19 
15:59:04 UTC (rev 22706)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-09-19 
16:53:54 UTC (rev 22707)
@@ -2000,7 +2000,7 @@
        /* (non-Javadoc)
         * @see 
freenet.node.OutgoingPacketMangler#processOutgoingOrRequeue(freenet.node.MessageItem[],
 freenet.node.PeerNode, boolean, boolean)
         */
-       public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode 
pn, boolean neverWaitForPacketNumber, boolean dontRequeue) {
+       public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode 
pn, boolean neverWaitForPacketNumber, boolean dontRequeue, boolean onePacket) {
                String requeueLogString = "";
                if(!dontRequeue) {
                        requeueLogString = ", requeueing";
@@ -2192,6 +2192,10 @@
                                                                
pn.requeueMessageItems(messages, lastIndex, messages.length - lastIndex, false, 
"Throwable(3)");
                                                        return;
                                                }
+                                               if(onePacket) {
+                                                       
pn.requeueMessageItems(messages, i, messageData.length - i, true, "Didn't fit 
in single packet");
+                                                       return;
+                                               }
                                        }
                                        lastIndex = i;
                                        if(i != messageData.length) {

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2008-09-19 15:59:04 UTC 
(rev 22706)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2008-09-19 16:53:54 UTC 
(rev 22707)
@@ -1054,6 +1054,12 @@
                        return null;
                return numbers;
        }
+       
+       public boolean hasPacketsToResend() {
+               synchronized(packetsToResend) {
+                       return !packetsToResend.isEmpty();
+               }
+       }

        public boolean isDeprecated() {
                return this.isDeprecated;

Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-09-19 
15:59:04 UTC (rev 22706)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-09-19 
16:53:54 UTC (rev 22707)
@@ -24,9 +24,11 @@
         * Build one or more packets and send them, from a whole bunch of 
messages.
         * If any MessageItem's are formatted already, they will be sent as 
single packets.
         * Any packets which cannot be sent will be requeued on the PeerNode.
+        * @param onePacketOnly If true, we will only send one packet, and will 
requeue any
+        * messages that don't fit in that single packet.
         */
        public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode 
pn,
-                       boolean neverWaitForPacketNumber, boolean dontRequeue);
+                       boolean neverWaitForPacketNumber, boolean dontRequeue, 
boolean onePacketOnly);

        /**
         * Resend a single packet.

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2008-09-19 15:59:04 UTC 
(rev 22706)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2008-09-19 16:53:54 UTC 
(rev 22707)
@@ -230,9 +230,9 @@
                                        continue;
                                }

-                               pn.maybeSendSomething(now, rpiTemp, rpiIntTemp);
+                               pn.maybeSendPacket(now, rpiTemp, rpiIntTemp);

-                               long urgentTime = pn.getNextUrgentTime();
+                               long urgentTime = pn.getNextUrgentTime(now);
                                // Should spam the logs, unless there is a 
deadlock
                                if(urgentTime < Long.MAX_VALUE && logMINOR)
                                        Logger.minor(this, "Next urgent time: " 
+ urgentTime + " for " + pn.getPeer());

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-09-19 15:59:04 UTC 
(rev 22706)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-09-19 16:53:54 UTC 
(rev 22707)
@@ -11,6 +11,7 @@
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
 import java.security.MessageDigest;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1308,17 +1309,41 @@
        * it means it will only contains ack requests etc., or
        * Long.MAX_VALUE if we have no pending ack request/acks/etc.
        */
-       public synchronized long getNextUrgentTime() {
+       public long getNextUrgentTime(long now) {
                long t = Long.MAX_VALUE;
+               synchronized(this) {
                KeyTracker kt = currentTracker;
-               if(kt != null)
+               if(kt != null) {
                        t = Math.min(t, kt.getNextUrgentTime());
+                       if(kt.hasPacketsToResend()) return now;
+               }
                kt = previousTracker;
-               if(kt != null)
+               if(kt != null) {
                        t = Math.min(t, kt.getNextUrgentTime());
+                       if(kt.hasPacketsToResend()) return now;
+               }
+               }
+               synchronized(messagesToSendNow) {
+                       for(LinkedList list : messagesToSendNow) {
+                               if(list.isEmpty()) continue;
+                               MessageItem item = (MessageItem) 
list.getFirst();
+                               t = Math.min(t, item.submitted + 
PacketSender.MAX_COALESCING_DELAY);
+                       }
+               }
                return t;
        }
-
+       
+       private synchronized boolean mustSendNotificationsNow(long now) {
+               KeyTracker kt = currentTracker;
+               if(kt != null) {
+                       if(kt.getNextUrgentTime() < now) return true;
+               }
+               kt = previousTracker;
+               if(kt != null)
+                       if(kt.getNextUrgentTime() < now) return true;
+               return false;
+       }
+       
        /**
        * @return The time at which we last sent a packet.
        */
@@ -4040,7 +4065,7 @@
        public void maybeSendPacket(long now, Vector rpiTemp, int[] rpiIntTemp) 
{
                // If there are any urgent notifications, we must send a packet.
                boolean mustSend = false;
-               if(getNextUrgentTime() < now) {
+               if(mustSendNotificationsNow(now)) {
                        mustSend = true;
                }
                // Any packets to resend? If so, resend ONE packet and then 
return.
@@ -4087,104 +4112,136 @@
                        }

                }
-               // 
-       }
-       
-       public void maybeSendSomething(long now, Vector rpiTemp, int[] 
rpiIntTemp) {
-               boolean mustSend = false;
-
-               // Any urgent notifications to send?
-               long urgentTime = getNextUrgentTime();
-               if(urgentTime <= now)
+               int minSize = 
getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
+               int maxSize = ((PacketSocketHandler) 
getSocketHandler()).getPacketSendThreshold();
+               
+               // If it's a keepalive, we must add an FNPVoid to ensure it has 
a packet number.
+               boolean keepalive = false;
+               
+               if(now - lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
+                       if(logMINOR)
+                               Logger.minor(this, "Sending keepalive");
+                       keepalive = true;
                        mustSend = true;
-               checkTrackerTimeout();
+               }

-               // Any packets to resend?
-               for(int j = 0; j < 2; j++) {
-                       KeyTracker kt;
-                       if(j == 0)
-                               kt = getCurrentKeyTracker();
-                       else if(j == 1)
-                               kt = getPreviousKeyTracker();
-                       else
-                               break; // impossible
-                       if(kt == null)
-                               continue;
-                       int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
-                       if(tmp == null)
-                               continue;
-                       rpiIntTemp = tmp;
-                       for(int k = 0; k < rpiTemp.size(); k++) {
-                               ResendPacketItem item = (ResendPacketItem) 
rpiTemp.get(k);
-                               if(item == null)
-                                       continue;
-                               try {
-                                       if(logMINOR)
-                                               Logger.minor(this, "Resending " 
+ item.packetNumber + " to " + item.kt);
-                                       getOutgoingMangler().resend(item);
-                                       mustSend = false;
-                               } catch(KeyChangedException e) {
-                                       Logger.error(this, "Caught " + e + " 
resending packets to " + kt);
-                                       requeueResendItems(rpiTemp);
-                                       break;
-                               } catch(NotConnectedException e) {
-                                       Logger.normal(this, "Caught " + e + " 
resending packets to " + kt);
-                                       requeueResendItems(rpiTemp);
-                                       break;
-                               } catch(PacketSequenceException e) {
-                                       Logger.error(this, "Caught " + e + " - 
disconnecting", e);
-                                       // PSE is fairly drastic, something is 
broken between us, but maybe we can resync
-                                       forceDisconnect(false); 
-                               } catch(WouldBlockException e) {
-                                       Logger.error(this, "Impossible: " + e, 
e);
+               ArrayList<MessageItem> messages = new 
ArrayList<MessageItem>(10);
+               
+               synchronized(messagesToSendNow) {
+                       
+                       // Any urgent messages to send now?
+                       
+                       if(!mustSend) {
+                               for(int i=0;i<messagesToSendNow.length;i++) {
+                                       if(!messagesToSendNow[i].isEmpty()) {
+                                               if(((MessageItem) 
messagesToSendNow[i].getFirst()).submitted + 
+                                                               
PacketSender.MAX_COALESCING_DELAY <= now) {
+                                                       mustSend = true;
+                                                       break;
+                                               }
+                                       }
                                }
                        }
-
-               }
-
-               // Any messages to send?
-               MessageItem[] messages = null;
-               messages = grabQueuedMessageItems();
-               if((messages != null) && (messages.length > 0)) {
-                       long l = Long.MAX_VALUE;
-                       int sz = 
getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers 
-                       for(int j = 0; j < messages.length; j++) {
-                               if(l > messages[j].submitted)
-                                       l = messages[j].submitted;
-                               sz += 2 + /* FIXME only 2? */ 
messages[j].getLength();
+                       
+                       if(!mustSend) {
+                               // What about total length?
+                               int length = minSize;
+                               for(LinkedList items : messagesToSendNow) {
+                                       for(Object o : items) {
+                                               MessageItem i = (MessageItem) o;
+                                               if(length + maxSize > maxSize) {
+                                                       mustSend = true;
+                                                       break;
+                                               } else length += maxSize;
+                                       }
+                               }
                        }
-                       if(node.enablePacketCoalescing && (l + 
PacketSender.MAX_COALESCING_DELAY > now) &&
-                               (sz < ((PacketSocketHandler) 
getSocketHandler()).getPacketSendThreshold())) {
-                               // Don't send immediately
-                               requeueMessageItems(messages, 0, 
messages.length, true, "TrafficCoalescing");
-                       } else {
-                               for(int j = 0; j < messages.length; j++)
-                                       if(logMINOR)
-                                               Logger.minor(this, "PS Sending: 
" + (messages[j].msg == null ? "(not a Message)" : 
messages[j].msg.getSpec().getName())+" to "+this);
-                               // Send packets, right now, blocking, including 
any active notifications
-                               
getOutgoingMangler().processOutgoingOrRequeue(messages, this, true, false);
-                               return;
+                       
+                       if(mustSend) {
+                               int size = minSize;
+                               boolean gotEnough = false;
+                               while(!gotEnough) {
+                                       // Urgent messages first.
+                                       boolean foundNothingUrgent = true;
+                                       for(LinkedList items : 
messagesToSendNow) {
+                                               while(!gotEnough) {
+                                                       if(items.isEmpty()) {
+                                                               break;
+                                                       }
+                                                       MessageItem item = 
(MessageItem) items.getFirst();
+                                                       if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY <= now) {
+                                                               
foundNothingUrgent = false;
+                                                               int thisSize = 
item.getLength();
+                                                               if(size + 
thisSize > maxSize) {
+                                                                       if(size 
== minSize) {
+                                                                               
// Send it anyway, nothing else to send.
+                                                                               
size += thisSize;
+                                                                               
items.removeFirst();
+                                                                               
messages.add(item);
+                                                                               
gotEnough = true;
+                                                                               
break;
+                                                                       }
+                                                                       
gotEnough = true;
+                                                                       break; 
// More items won't fit.
+                                                               }
+                                                               size += 
thisSize;
+                                                               
items.removeFirst();
+                                                               
messages.add(item);
+                                                       } else {
+                                                               break;
+                                                       }
+                                               }
+                                       }
+                                       if(foundNothingUrgent) break;
+                               }
+                               
+                               // Now the not-so-urgent messages.
+                               while(!gotEnough) {
+                                       boolean foundNothing = true;
+                                       for(LinkedList items : 
messagesToSendNow) {
+                                               while(!gotEnough) {
+                                                       if(items.isEmpty()) {
+                                                               break;
+                                                       }
+                                                       MessageItem item = 
(MessageItem) items.getFirst();
+                                                       foundNothing = false;
+                                                       int thisSize = 
item.getLength();
+                                                       if(size + thisSize > 
maxSize) {
+                                                               if(size == 
minSize) {
+                                                                       // Send 
it anyway, nothing else to send.
+                                                                       size += 
thisSize;
+                                                                       
items.removeFirst();
+                                                                       
messages.add(item);
+                                                                       
gotEnough = true;
+                                                                       break;
+                                                               }
+                                                               gotEnough = 
true;
+                                                               break; // More 
items won't fit.
+                                                       }
+                                                       size += thisSize;
+                                                       items.removeFirst();
+                                                       messages.add(item);
+                                               }
+                                       }
+                                       if(foundNothing) break;
+                               }
                        }
+                       
                }
-
-               if(mustSend)
-                       // Send them
-
-                       try {
-                               sendAnyUrgentNotifications(false);
-                       } catch(PacketSequenceException e) {
-                               Logger.error(this, "Caught " + e + " - while 
sending urgent notifications : disconnecting", e);
-                               forceDisconnect(false);
-                       }
-
-               // Need to send a keepalive packet?
-               if(now - lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
-                       if(logMINOR)
-                               Logger.minor(this, "Sending keepalive");
+               
+               if(messages.isEmpty() && keepalive) {
                        // Force packet to have a sequence number.
                        Message m = DMT.createFNPVoid();
                        addToLocalNodeSentMessagesToStatistic(m);
-                       getOutgoingMangler().processOutgoingOrRequeue(new 
MessageItem[]{new MessageItem(m, null, 0, null, this)}, this, true, true);
+                       messages.add(new MessageItem(m, null, 0, null, this));
                }
+               
+               if(messages.isEmpty()) return;
+               
+               // Send packets, right now, blocking, including any active 
notifications
+               // Note that processOutgoingOrRequeue will drop messages from 
the end
+               // if necessary to fit the messages into a single packet.
+               
getOutgoingMangler().processOutgoingOrRequeue(messages.toArray(new 
MessageItem[messages.size()]), this, true, false, true);
+
        }
 }


Reply via email to