Author: toad
Date: 2008-09-23 23:14:57 +0000 (Tue, 23 Sep 2008)
New Revision: 22784

Modified:
   trunk/freenet/src/freenet/node/PeerMessageQueue.java
Log:
Round-robin between different IDs within a priority.


Modified: trunk/freenet/src/freenet/node/PeerMessageQueue.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerMessageQueue.java        2008-09-23 
19:11:59 UTC (rev 22783)
+++ trunk/freenet/src/freenet/node/PeerMessageQueue.java        2008-09-23 
23:14:57 UTC (rev 22784)
@@ -1,8 +1,10 @@
 package freenet.node;

 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Map;

 import freenet.io.comm.DMT;
 import freenet.support.Logger;
@@ -14,24 +16,262 @@
  */
 public class PeerMessageQueue {

-       private final LinkedList[] queuesByPriority;
+       private final PrioQueue[] queuesByPriority;
+
+       private class PrioQueue {
+               LinkedList<MessageItem> itemsNoID;
+               ArrayList<LinkedList<MessageItem>> itemsWithID;
+               Map<Long, LinkedList<MessageItem>> itemsByID;
+               // Construct structures lazily, we're protected by the overall 
synchronized.
+               
+               /** 0 = itemsNoID, else 1-N = in itemsWithID[0-(N-1)].
+                * Set when a packet is sent. */
+               private int roundRobinCounter;
+               
+               public void addLast(MessageItem item) {
+                       if(item.msg == null) {
+                               if(itemsNoID == null) itemsNoID = new 
LinkedList<MessageItem>();
+                               itemsNoID.addLast(item);
+                               return;
+                       }
+                       Object o = item.msg.getObject(DMT.UID);
+                       if(o == null || !(o instanceof Long)) {
+                               if(itemsNoID == null) itemsNoID = new 
LinkedList<MessageItem>();
+                               itemsNoID.addLast(item);
+                               return;
+                       }
+                       Long id = (Long) o;
+                       LinkedList<MessageItem> list;
+                       if(itemsByID == null) {
+                               itemsByID = new HashMap<Long, 
LinkedList<MessageItem>>();
+                               itemsWithID = new 
ArrayList<LinkedList<MessageItem>>();
+                               list = new LinkedList<MessageItem>();
+                               itemsWithID.add(list);
+                               itemsByID.put(id, list);
+                       } else {
+                               list = itemsByID.get(id);
+                               if(list == null) {
+                                       list = new LinkedList<MessageItem>();
+                                       itemsWithID.add(list);
+                                       itemsByID.put(id, list);
+                               }
+                       }
+                       list.addLast(item);
+               }
+               
+               public void addFirst(MessageItem item) {
+                       if(item.msg == null) {
+                               if(itemsNoID == null) itemsNoID = new 
LinkedList<MessageItem>();
+                               itemsNoID.addFirst(item);
+                               return;
+                       }
+                       Object o = item.msg.getObject(DMT.UID);
+                       if(o == null || !(o instanceof Long)) {
+                               if(itemsNoID == null) itemsNoID = new 
LinkedList<MessageItem>();
+                               itemsNoID.addFirst(item);
+                               return;
+                       }
+                       Long id = (Long) o;
+                       LinkedList<MessageItem> list;
+                       if(itemsByID == null) {
+                               itemsByID = new HashMap<Long, 
LinkedList<MessageItem>>();
+                               itemsWithID = new 
ArrayList<LinkedList<MessageItem>>();
+                               list = new LinkedList<MessageItem>();
+                               itemsWithID.add(list);
+                               itemsByID.put(id, list);
+                       } else {
+                               list = itemsByID.get(id);
+                               if(list == null) {
+                                       list = new LinkedList<MessageItem>();
+                                       itemsWithID.add(list);
+                                       itemsByID.put(id, list);
+                               }
+                       }
+                       list.addFirst(item);
+               }
+
+               public int size() {
+                       int size = 0;
+                       if(itemsNoID != null)
+                               size += itemsNoID.size();
+                       if(itemsWithID != null)
+                               for(LinkedList<MessageItem> list : itemsWithID)
+                                       size += list.size();
+                       return size;
+               }
+
+               public int addTo(MessageItem[] output, int ptr) {
+                       if(itemsNoID != null)
+                               for(MessageItem item : itemsNoID)
+                                       output[ptr++] = item;
+                       if(itemsWithID != null)
+                               for(LinkedList<MessageItem> list : itemsWithID)
+                                       for(MessageItem item : list)
+                                               output[ptr++] = item;
+                       return ptr;
+               }
+
+               public long getNextUrgentTime(long t, long now) {
+                       if(itemsNoID != null) {
+                               t = Math.min(t, itemsNoID.getFirst().submitted 
+ PacketSender.MAX_COALESCING_DELAY);
+                               if(t <= now) return t;
+                       }
+                       if(itemsWithID != null) {
+                               for(LinkedList<MessageItem> items : 
itemsWithID) {
+                                       t = Math.min(t, 
items.getFirst().submitted + PacketSender.MAX_COALESCING_DELAY);
+                                       if(t <= now) return t;
+                               }
+                       }
+                       return t;
+               }
+
+               public int addSize(int length, int maxSize) {
+                       if(itemsNoID != null) {
+                               for(MessageItem item : itemsNoID) {
+                                       int thisLen = item.getLength();
+                                       length += thisLen;
+                                       if(length > maxSize) return length;
+                               }
+                       }
+                       if(itemsWithID != null) {
+                               for(LinkedList<MessageItem> list : itemsWithID) 
{
+                                       for(MessageItem item : list) {
+                                               int thisLen = item.getLength();
+                                               length += thisLen;
+                                               if(length > maxSize) return 
length;
+                                       }
+                               }
+                       }
+                       return length;
+               }
+
+               /**
+                * @param size
+                * @param minSize
+                * @param maxSize
+                * @param now
+                * @param messages
+                * @return The new size of the packet, multiplied by -1 iff 
there are more
+                * messages but they don't fit.
+                */
+               public int addUrgentMessages(int size, int minSize, int 
maxSize, long now, ArrayList<MessageItem> messages) {
+                       int lists = 0;
+                       if(itemsNoID != null)
+                               lists++;
+                       if(itemsWithID != null)
+                               lists += itemsWithID.size();
+                       for(int i=0;i<lists;i++) {
+                               LinkedList<MessageItem> list;
+                               int l = (i + roundRobinCounter) % lists;
+                               if(itemsNoID != null) {
+                                       if(l == 0) list = itemsNoID;
+                                       else list = itemsWithID.get(l-1);
+                               } else
+                                       list = itemsWithID.get(l);
+                               
+                               while(true) {
+                                       if(list.isEmpty()) continue;
+                                       MessageItem item = list.getFirst();
+                                       if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY <= now) {
+                                               int thisSize = item.getLength();
+                                               if(size + 2 + thisSize > 
maxSize) {
+                                                       if(size == minSize) {
+                                                               // Send it 
anyway, nothing else to send.
+                                                               size += 2 + 
thisSize;
+                                                               
list.removeFirst();
+                                                               
messages.add(item);
+                                                               
roundRobinCounter = i;
+                                                               return size;
+                                                       }
+                                                       return -size;
+                                               }
+                                               size += 2 + thisSize;
+                                               list.removeFirst();
+                                               messages.add(item);
+                                               roundRobinCounter = i;
+                                       } else {
+                                               break;
+                                       }
+                               }
+                       }
+                       return size;
+               }
+               
+               /**
+                * @param size
+                * @param minSize
+                * @param maxSize
+                * @param now
+                * @param messages
+                * @return The new size of the packet, multiplied by -1 iff 
there are more
+                * messages but they don't fit.
+                */
+               public int addMessages(int size, int minSize, int maxSize, long 
now, ArrayList<MessageItem> messages) {
+                       int lists = 0;
+                       if(itemsNoID != null)
+                               lists++;
+                       if(itemsWithID != null)
+                               lists += itemsWithID.size();
+                       for(int i=0;i<lists;i++) {
+                               LinkedList<MessageItem> list;
+                               int l = (i + roundRobinCounter) % lists;
+                               if(itemsNoID != null) {
+                                       if(l == 0) list = itemsNoID;
+                                       else list = itemsWithID.get(l-1);
+                               } else
+                                       list = itemsWithID.get(l);
+                               
+                               while(true) {
+                                       if(list.isEmpty()) continue;
+                                       MessageItem item = list.getFirst();
+                                       int thisSize = item.getLength();
+                                       if(size + 2 + thisSize > maxSize) {
+                                               if(size == minSize) {
+                                                       // Send it anyway, 
nothing else to send.
+                                                       size += 2 + thisSize;
+                                                       list.removeFirst();
+                                                       messages.add(item);
+                                                       roundRobinCounter = i;
+                                                       return size;
+                                               }
+                                               return -size;
+                                       }
+                                       size += 2 + thisSize;
+                                       list.removeFirst();
+                                       messages.add(item);
+                                       roundRobinCounter = i;
+                               }
+                       }
+                       return size;
+               }
+               
+               
+               
+       }

        PeerMessageQueue() {
-               queuesByPriority = new LinkedList[DMT.NUM_PRIORITIES];
+               queuesByPriority = new PrioQueue[DMT.NUM_PRIORITIES];
                for(int i=0;i<queuesByPriority.length;i++)
-                       queuesByPriority[i] = new LinkedList();
+                       queuesByPriority[i] = new PrioQueue();
        }

        public synchronized int queueAndEstimateSize(MessageItem item) {
                enqueuePrioritizedMessageItem(item);
                int x = 0;
-               for(int p=0;p<queuesByPriority.length;p++) {
-                       Iterator i = queuesByPriority[p].iterator();
-                       for(; i.hasNext();) {
-                               MessageItem it = (MessageItem) (i.next());
-                               x += it.getLength() + 2;
-                               if(x > 1024)
-                                       break;
+               for(PrioQueue pq : queuesByPriority) {
+                       if(pq.itemsNoID != null)
+                               for(MessageItem it : pq.itemsNoID) {
+                                       x += it.getLength() + 2;
+                                       if(x > 1024)
+                                               break;
+                               }
+                       if(pq.itemsWithID != null) {
+                               for(LinkedList<MessageItem> q : pq.itemsWithID)
+                                       for(MessageItem it : q) {
+                                               x += it.getLength() + 2;
+                                               if(x > 1024)
+                                                       break;
+                                       }
                        }
                }
                return x;
@@ -39,12 +279,14 @@

        public synchronized long getMessageQueueLengthBytes() {
                long x = 0;
-               for(int p=0;p<queuesByPriority.length;p++) {
-                       Iterator i = queuesByPriority[p].iterator();
-                       for(; i.hasNext();) {
-                               MessageItem it = (MessageItem) (i.next());
-                               x += it.getLength() + 2;
-                       }
+               for(PrioQueue pq : queuesByPriority) {
+                       if(pq.itemsNoID != null)
+                               for(MessageItem it : pq.itemsNoID)
+                                       x += it.getLength() + 2;
+                       if(pq.itemsWithID != null)
+                               for(LinkedList<MessageItem> q : pq.itemsWithID)
+                                       for(MessageItem it : q)
+                                               x += it.getLength() + 2;
                }
                return x;
        }
@@ -70,46 +312,37 @@
                        size += queuesByPriority[i].size();
                MessageItem[] output = new MessageItem[size];
                int ptr = 0;
-               for(int i=0;i<queuesByPriority.length;i++) {
-                       for(Object item : queuesByPriority[i])
-                               output[ptr++] = (MessageItem) item;
+               for(PrioQueue queue : queuesByPriority) {
+                       ptr = queue.addTo(output, ptr);
                }
                return output;
        }

-       public long getNextUrgentTime(long t, long now) {
-               for(LinkedList list : queuesByPriority) {
-                       if(list.isEmpty()) continue;
-                       MessageItem item = (MessageItem) list.getFirst();
-                       if(item.submitted + PacketSender.MAX_COALESCING_DELAY < 
now && Logger.shouldLog(Logger.MINOR, this))
-                               Logger.minor(this, "Message queued to send 
immediately");
-                       t = Math.min(t, item.submitted + 
PacketSender.MAX_COALESCING_DELAY);
+       /**
+        * Get the time at which the next message must be sent. If any message 
is 
+        * overdue, we will return a value less than now, which may not be 
completely 
+        * accurate.
+        * @param t
+        * @param now
+        * @return
+        */
+       public synchronized long getNextUrgentTime(long t, long now) {
+               for(PrioQueue queue : queuesByPriority) {
+                       t = Math.min(t, queue.getNextUrgentTime(t, now));
+                       if(t <= now) return t; // How much in the past doesn't 
matter, as long as it's in the past.
                }
                return t;
        }

        public boolean mustSendNow(long now) {
-               for(int i=0;i<queuesByPriority.length;i++) {
-                       if(!queuesByPriority[i].isEmpty()) {
-                               if(((MessageItem) 
queuesByPriority[i].getFirst()).submitted + 
-                                               
PacketSender.MAX_COALESCING_DELAY <= now) {
-                                       return true;
-                               }
-                       }
-               }
-               return false;
+               return getNextUrgentTime(Long.MAX_VALUE, now) <= now;
        }

        public boolean mustSendSize(int minSize, int maxSize) {
                int length = minSize;
-               for(LinkedList items : queuesByPriority) {
-                       for(Object o : items) {
-                               MessageItem i = (MessageItem) o;
-                               int thisSize = i.getLength();
-                               if(length + 2 + thisSize > maxSize) {
-                                       return true;
-                               } else length += 2 + thisSize;
-                       }
+               for(PrioQueue items : queuesByPriority) {
+                       length = items.addSize(length, maxSize);
+                       if(length > maxSize) return true;
                }
                return false;
        }
@@ -124,41 +357,16 @@
         * @return The new size of the packet, multiplied by -1 iff there are 
more
         * messages but they don't fit.
         */
-       public int addUrgentMessages(int size, long now, int minSize, int 
maxSize, ArrayList<MessageItem> messages) {
+       public synchronized int addUrgentMessages(int size, long now, int 
minSize, int maxSize, ArrayList<MessageItem> messages) {
                boolean gotEnough = false;
                while(!gotEnough) {
-                       // Urgent messages first.
-                       boolean foundNothingUrgent = true;
-                       for(LinkedList items : queuesByPriority) {
-                               while(!gotEnough) {
-                                       if(items.isEmpty()) {
-                                               break;
-                                       }
-                                       MessageItem item = (MessageItem) 
items.getFirst();
-                                       if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY <= now) {
-                                               foundNothingUrgent = false;
-                                               int thisSize = item.getLength();
-                                               if(size + 2 + thisSize > 
maxSize) {
-                                                       if(size == minSize) {
-                                                               // Send it 
anyway, nothing else to send.
-                                                               size += 2 + 
thisSize;
-                                                               
items.removeFirst();
-                                                               
messages.add(item);
-                                                               gotEnough = 
true;
-                                                               break;
-                                                       }
-                                                       gotEnough = true;
-                                                       break; // More items 
won't fit.
-                                               }
-                                               size += 2 + thisSize;
-                                               items.removeFirst();
-                                               messages.add(item);
-                                       } else {
-                                               break;
-                                       }
+                       for(PrioQueue queue : queuesByPriority) {
+                               size = queue.addUrgentMessages(size, minSize, 
maxSize, now, messages);
+                               if(size < 0) {
+                                       size = -size;
+                                       gotEnough = true;
                                }
                        }
-                       if(foundNothingUrgent) break;
                }
                if(gotEnough)
                        return -size;
@@ -179,33 +387,13 @@
        public int addNonUrgentMessages(int size, long now, int minSize, int 
maxSize, ArrayList<MessageItem> messages) {
                boolean gotEnough = false;
                while(!gotEnough) {
-                       boolean foundNothing = true;
-                       for(LinkedList items : queuesByPriority) {
-                               while(!gotEnough) {
-                                       if(items.isEmpty()) {
-                                               break;
-                                       }
-                                       MessageItem item = (MessageItem) 
items.getFirst();
-                                       foundNothing = false;
-                                       int thisSize = item.getLength();
-                                       if(size + 2 + thisSize > maxSize) {
-                                               if(size == minSize) {
-                                                       // Send it anyway, 
nothing else to send.
-                                                       size += 2 + thisSize;
-                                                       items.removeFirst();
-                                                       messages.add(item);
-                                                       gotEnough = true;
-                                                       break;
-                                               }
-                                               gotEnough = true;
-                                               break; // More items won't fit.
-                                       }
-                                       size += 2 + thisSize;
-                                       items.removeFirst();
-                                       messages.add(item);
+                       for(PrioQueue queue : queuesByPriority) {
+                               size = queue.addMessages(size, minSize, 
maxSize, now, messages);
+                               if(size < 0) {
+                                       size = -size;
+                                       gotEnough = true;
                                }
                        }
-                       if(foundNothing) break;
                }
                if(gotEnough)
                        return -size;
@@ -215,3 +403,4 @@


 }
+


Reply via email to