Author: toad
Date: 2008-09-23 23:14:57 +0000 (Tue, 23 Sep 2008)
New Revision: 22784
Modified:
trunk/freenet/src/freenet/node/PeerMessageQueue.java
Log:
Round-robin between different IDs within a priority.
Modified: trunk/freenet/src/freenet/node/PeerMessageQueue.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerMessageQueue.java 2008-09-23
19:11:59 UTC (rev 22783)
+++ trunk/freenet/src/freenet/node/PeerMessageQueue.java 2008-09-23
23:14:57 UTC (rev 22784)
@@ -1,8 +1,10 @@
package freenet.node;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Map;
import freenet.io.comm.DMT;
import freenet.support.Logger;
@@ -14,24 +16,262 @@
*/
public class PeerMessageQueue {
- private final LinkedList[] queuesByPriority;
+ private final PrioQueue[] queuesByPriority;
+
+ private class PrioQueue {
+ LinkedList<MessageItem> itemsNoID;
+ ArrayList<LinkedList<MessageItem>> itemsWithID;
+ Map<Long, LinkedList<MessageItem>> itemsByID;
+ // Construct structures lazily, we're protected by the overall
synchronized.
+
+ /** 0 = itemsNoID, else 1-N = in itemsWithID[0-(N-1)].
+ * Set when a packet is sent. */
+ private int roundRobinCounter;
+
+ public void addLast(MessageItem item) {
+ if(item.msg == null) {
+ if(itemsNoID == null) itemsNoID = new
LinkedList<MessageItem>();
+ itemsNoID.addLast(item);
+ return;
+ }
+ Object o = item.msg.getObject(DMT.UID);
+ if(o == null || !(o instanceof Long)) {
+ if(itemsNoID == null) itemsNoID = new
LinkedList<MessageItem>();
+ itemsNoID.addLast(item);
+ return;
+ }
+ Long id = (Long) o;
+ LinkedList<MessageItem> list;
+ if(itemsByID == null) {
+ itemsByID = new HashMap<Long,
LinkedList<MessageItem>>();
+ itemsWithID = new
ArrayList<LinkedList<MessageItem>>();
+ list = new LinkedList<MessageItem>();
+ itemsWithID.add(list);
+ itemsByID.put(id, list);
+ } else {
+ list = itemsByID.get(id);
+ if(list == null) {
+ list = new LinkedList<MessageItem>();
+ itemsWithID.add(list);
+ itemsByID.put(id, list);
+ }
+ }
+ list.addLast(item);
+ }
+
+ public void addFirst(MessageItem item) {
+ if(item.msg == null) {
+ if(itemsNoID == null) itemsNoID = new
LinkedList<MessageItem>();
+ itemsNoID.addFirst(item);
+ return;
+ }
+ Object o = item.msg.getObject(DMT.UID);
+ if(o == null || !(o instanceof Long)) {
+ if(itemsNoID == null) itemsNoID = new
LinkedList<MessageItem>();
+ itemsNoID.addFirst(item);
+ return;
+ }
+ Long id = (Long) o;
+ LinkedList<MessageItem> list;
+ if(itemsByID == null) {
+ itemsByID = new HashMap<Long,
LinkedList<MessageItem>>();
+ itemsWithID = new
ArrayList<LinkedList<MessageItem>>();
+ list = new LinkedList<MessageItem>();
+ itemsWithID.add(list);
+ itemsByID.put(id, list);
+ } else {
+ list = itemsByID.get(id);
+ if(list == null) {
+ list = new LinkedList<MessageItem>();
+ itemsWithID.add(list);
+ itemsByID.put(id, list);
+ }
+ }
+ list.addFirst(item);
+ }
+
+ public int size() {
+ int size = 0;
+ if(itemsNoID != null)
+ size += itemsNoID.size();
+ if(itemsWithID != null)
+ for(LinkedList<MessageItem> list : itemsWithID)
+ size += list.size();
+ return size;
+ }
+
+ public int addTo(MessageItem[] output, int ptr) {
+ if(itemsNoID != null)
+ for(MessageItem item : itemsNoID)
+ output[ptr++] = item;
+ if(itemsWithID != null)
+ for(LinkedList<MessageItem> list : itemsWithID)
+ for(MessageItem item : list)
+ output[ptr++] = item;
+ return ptr;
+ }
+
+ public long getNextUrgentTime(long t, long now) {
+ if(itemsNoID != null) {
+ t = Math.min(t, itemsNoID.getFirst().submitted
+ PacketSender.MAX_COALESCING_DELAY);
+ if(t <= now) return t;
+ }
+ if(itemsWithID != null) {
+ for(LinkedList<MessageItem> items :
itemsWithID) {
+ t = Math.min(t,
items.getFirst().submitted + PacketSender.MAX_COALESCING_DELAY);
+ if(t <= now) return t;
+ }
+ }
+ return t;
+ }
+
+ public int addSize(int length, int maxSize) {
+ if(itemsNoID != null) {
+ for(MessageItem item : itemsNoID) {
+ int thisLen = item.getLength();
+ length += thisLen;
+ if(length > maxSize) return length;
+ }
+ }
+ if(itemsWithID != null) {
+ for(LinkedList<MessageItem> list : itemsWithID)
{
+ for(MessageItem item : list) {
+ int thisLen = item.getLength();
+ length += thisLen;
+ if(length > maxSize) return
length;
+ }
+ }
+ }
+ return length;
+ }
+
+ /**
+ * @param size
+ * @param minSize
+ * @param maxSize
+ * @param now
+ * @param messages
+ * @return The new size of the packet, multiplied by -1 iff
there are more
+ * messages but they don't fit.
+ */
+ public int addUrgentMessages(int size, int minSize, int
maxSize, long now, ArrayList<MessageItem> messages) {
+ int lists = 0;
+ if(itemsNoID != null)
+ lists++;
+ if(itemsWithID != null)
+ lists += itemsWithID.size();
+ for(int i=0;i<lists;i++) {
+ LinkedList<MessageItem> list;
+ int l = (i + roundRobinCounter) % lists;
+ if(itemsNoID != null) {
+ if(l == 0) list = itemsNoID;
+ else list = itemsWithID.get(l-1);
+ } else
+ list = itemsWithID.get(l);
+
+ while(true) {
+ if(list.isEmpty()) continue;
+ MessageItem item = list.getFirst();
+ if(item.submitted +
PacketSender.MAX_COALESCING_DELAY <= now) {
+ int thisSize = item.getLength();
+ if(size + 2 + thisSize >
maxSize) {
+ if(size == minSize) {
+ // Send it
anyway, nothing else to send.
+ size += 2 +
thisSize;
+
list.removeFirst();
+
messages.add(item);
+
roundRobinCounter = i;
+ return size;
+ }
+ return -size;
+ }
+ size += 2 + thisSize;
+ list.removeFirst();
+ messages.add(item);
+ roundRobinCounter = i;
+ } else {
+ break;
+ }
+ }
+ }
+ return size;
+ }
+
+ /**
+ * @param size
+ * @param minSize
+ * @param maxSize
+ * @param now
+ * @param messages
+ * @return The new size of the packet, multiplied by -1 iff
there are more
+ * messages but they don't fit.
+ */
+ public int addMessages(int size, int minSize, int maxSize, long
now, ArrayList<MessageItem> messages) {
+ int lists = 0;
+ if(itemsNoID != null)
+ lists++;
+ if(itemsWithID != null)
+ lists += itemsWithID.size();
+ for(int i=0;i<lists;i++) {
+ LinkedList<MessageItem> list;
+ int l = (i + roundRobinCounter) % lists;
+ if(itemsNoID != null) {
+ if(l == 0) list = itemsNoID;
+ else list = itemsWithID.get(l-1);
+ } else
+ list = itemsWithID.get(l);
+
+ while(true) {
+ if(list.isEmpty()) continue;
+ MessageItem item = list.getFirst();
+ int thisSize = item.getLength();
+ if(size + 2 + thisSize > maxSize) {
+ if(size == minSize) {
+ // Send it anyway,
nothing else to send.
+ size += 2 + thisSize;
+ list.removeFirst();
+ messages.add(item);
+ roundRobinCounter = i;
+ return size;
+ }
+ return -size;
+ }
+ size += 2 + thisSize;
+ list.removeFirst();
+ messages.add(item);
+ roundRobinCounter = i;
+ }
+ }
+ return size;
+ }
+
+
+
+ }
PeerMessageQueue() {
- queuesByPriority = new LinkedList[DMT.NUM_PRIORITIES];
+ queuesByPriority = new PrioQueue[DMT.NUM_PRIORITIES];
for(int i=0;i<queuesByPriority.length;i++)
- queuesByPriority[i] = new LinkedList();
+ queuesByPriority[i] = new PrioQueue();
}
public synchronized int queueAndEstimateSize(MessageItem item) {
enqueuePrioritizedMessageItem(item);
int x = 0;
- for(int p=0;p<queuesByPriority.length;p++) {
- Iterator i = queuesByPriority[p].iterator();
- for(; i.hasNext();) {
- MessageItem it = (MessageItem) (i.next());
- x += it.getLength() + 2;
- if(x > 1024)
- break;
+ for(PrioQueue pq : queuesByPriority) {
+ if(pq.itemsNoID != null)
+ for(MessageItem it : pq.itemsNoID) {
+ x += it.getLength() + 2;
+ if(x > 1024)
+ break;
+ }
+ if(pq.itemsWithID != null) {
+ for(LinkedList<MessageItem> q : pq.itemsWithID)
+ for(MessageItem it : q) {
+ x += it.getLength() + 2;
+ if(x > 1024)
+ break;
+ }
}
}
return x;
@@ -39,12 +279,14 @@
public synchronized long getMessageQueueLengthBytes() {
long x = 0;
- for(int p=0;p<queuesByPriority.length;p++) {
- Iterator i = queuesByPriority[p].iterator();
- for(; i.hasNext();) {
- MessageItem it = (MessageItem) (i.next());
- x += it.getLength() + 2;
- }
+ for(PrioQueue pq : queuesByPriority) {
+ if(pq.itemsNoID != null)
+ for(MessageItem it : pq.itemsNoID)
+ x += it.getLength() + 2;
+ if(pq.itemsWithID != null)
+ for(LinkedList<MessageItem> q : pq.itemsWithID)
+ for(MessageItem it : q)
+ x += it.getLength() + 2;
}
return x;
}
@@ -70,46 +312,37 @@
size += queuesByPriority[i].size();
MessageItem[] output = new MessageItem[size];
int ptr = 0;
- for(int i=0;i<queuesByPriority.length;i++) {
- for(Object item : queuesByPriority[i])
- output[ptr++] = (MessageItem) item;
+ for(PrioQueue queue : queuesByPriority) {
+ ptr = queue.addTo(output, ptr);
}
return output;
}
- public long getNextUrgentTime(long t, long now) {
- for(LinkedList list : queuesByPriority) {
- if(list.isEmpty()) continue;
- MessageItem item = (MessageItem) list.getFirst();
- if(item.submitted + PacketSender.MAX_COALESCING_DELAY <
now && Logger.shouldLog(Logger.MINOR, this))
- Logger.minor(this, "Message queued to send
immediately");
- t = Math.min(t, item.submitted +
PacketSender.MAX_COALESCING_DELAY);
+ /**
+ * Get the time at which the next message must be sent. If any message
is
+ * overdue, we will return a value less than now, which may not be
completely
+ * accurate.
+ * @param t
+ * @param now
+ * @return
+ */
+ public synchronized long getNextUrgentTime(long t, long now) {
+ for(PrioQueue queue : queuesByPriority) {
+ t = Math.min(t, queue.getNextUrgentTime(t, now));
+ if(t <= now) return t; // How much in the past doesn't
matter, as long as it's in the past.
}
return t;
}
public boolean mustSendNow(long now) {
- for(int i=0;i<queuesByPriority.length;i++) {
- if(!queuesByPriority[i].isEmpty()) {
- if(((MessageItem)
queuesByPriority[i].getFirst()).submitted +
-
PacketSender.MAX_COALESCING_DELAY <= now) {
- return true;
- }
- }
- }
- return false;
+ return getNextUrgentTime(Long.MAX_VALUE, now) <= now;
}
public boolean mustSendSize(int minSize, int maxSize) {
int length = minSize;
- for(LinkedList items : queuesByPriority) {
- for(Object o : items) {
- MessageItem i = (MessageItem) o;
- int thisSize = i.getLength();
- if(length + 2 + thisSize > maxSize) {
- return true;
- } else length += 2 + thisSize;
- }
+ for(PrioQueue items : queuesByPriority) {
+ length = items.addSize(length, maxSize);
+ if(length > maxSize) return true;
}
return false;
}
@@ -124,41 +357,16 @@
* @return The new size of the packet, multiplied by -1 iff there are
more
* messages but they don't fit.
*/
- public int addUrgentMessages(int size, long now, int minSize, int
maxSize, ArrayList<MessageItem> messages) {
+ public synchronized int addUrgentMessages(int size, long now, int
minSize, int maxSize, ArrayList<MessageItem> messages) {
boolean gotEnough = false;
while(!gotEnough) {
- // Urgent messages first.
- boolean foundNothingUrgent = true;
- for(LinkedList items : queuesByPriority) {
- while(!gotEnough) {
- if(items.isEmpty()) {
- break;
- }
- MessageItem item = (MessageItem)
items.getFirst();
- if(item.submitted +
PacketSender.MAX_COALESCING_DELAY <= now) {
- foundNothingUrgent = false;
- int thisSize = item.getLength();
- if(size + 2 + thisSize >
maxSize) {
- if(size == minSize) {
- // Send it
anyway, nothing else to send.
- size += 2 +
thisSize;
-
items.removeFirst();
-
messages.add(item);
- gotEnough =
true;
- break;
- }
- gotEnough = true;
- break; // More items
won't fit.
- }
- size += 2 + thisSize;
- items.removeFirst();
- messages.add(item);
- } else {
- break;
- }
+ for(PrioQueue queue : queuesByPriority) {
+ size = queue.addUrgentMessages(size, minSize,
maxSize, now, messages);
+ if(size < 0) {
+ size = -size;
+ gotEnough = true;
}
}
- if(foundNothingUrgent) break;
}
if(gotEnough)
return -size;
@@ -179,33 +387,13 @@
public int addNonUrgentMessages(int size, long now, int minSize, int
maxSize, ArrayList<MessageItem> messages) {
boolean gotEnough = false;
while(!gotEnough) {
- boolean foundNothing = true;
- for(LinkedList items : queuesByPriority) {
- while(!gotEnough) {
- if(items.isEmpty()) {
- break;
- }
- MessageItem item = (MessageItem)
items.getFirst();
- foundNothing = false;
- int thisSize = item.getLength();
- if(size + 2 + thisSize > maxSize) {
- if(size == minSize) {
- // Send it anyway,
nothing else to send.
- size += 2 + thisSize;
- items.removeFirst();
- messages.add(item);
- gotEnough = true;
- break;
- }
- gotEnough = true;
- break; // More items won't fit.
- }
- size += 2 + thisSize;
- items.removeFirst();
- messages.add(item);
+ for(PrioQueue queue : queuesByPriority) {
+ size = queue.addMessages(size, minSize,
maxSize, now, messages);
+ if(size < 0) {
+ size = -size;
+ gotEnough = true;
}
}
- if(foundNothing) break;
}
if(gotEnough)
return -size;
@@ -215,3 +403,4 @@
}
+