Author: toad
Date: 2008-09-23 19:08:07 +0000 (Tue, 23 Sep 2008)
New Revision: 22780
Added:
trunk/freenet/src/freenet/node/PeerMessageQueue.java
Modified:
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Split PeerMessageQueue into a separate class.
Added: trunk/freenet/src/freenet/node/PeerMessageQueue.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerMessageQueue.java
(rev 0)
+++ trunk/freenet/src/freenet/node/PeerMessageQueue.java 2008-09-23
19:08:07 UTC (rev 22780)
@@ -0,0 +1,216 @@
+package freenet.node;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import freenet.io.comm.DMT;
+import freenet.support.Logger;
+
+/**
+ * Queue of messages to send to a node. Ordered first by priority then by time.
+ * Will soon be round-robin between different transfers/UIDs/clients too.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class PeerMessageQueue {
+
+ private final LinkedList[] queuesByPriority;
+
+ PeerMessageQueue() {
+ queuesByPriority = new LinkedList[DMT.NUM_PRIORITIES];
+ for(int i=0;i<queuesByPriority.length;i++)
+ queuesByPriority[i] = new LinkedList();
+ }
+
+ public synchronized int queueAndEstimateSize(MessageItem item) {
+ enqueuePrioritizedMessageItem(item);
+ int x = 0;
+ for(int p=0;p<queuesByPriority.length;p++) {
+ Iterator i = queuesByPriority[p].iterator();
+ for(; i.hasNext();) {
+ MessageItem it = (MessageItem) (i.next());
+ x += it.getLength() + 2;
+ if(x > 1024)
+ break;
+ }
+ }
+ return x;
+ }
+
+ public synchronized long getMessageQueueLengthBytes() {
+ long x = 0;
+ for(int p=0;p<queuesByPriority.length;p++) {
+ Iterator i = queuesByPriority[p].iterator();
+ for(; i.hasNext();) {
+ MessageItem it = (MessageItem) (i.next());
+ x += it.getLength() + 2;
+ }
+ }
+ return x;
+ }
+
+ private synchronized void enqueuePrioritizedMessageItem(MessageItem
addMe) {
+ //Assume it goes on the end, both the common case
+ short prio = addMe.getPriority();
+ queuesByPriority[prio].addLast(addMe);
+ }
+
+ /**
+ * like enqueuePrioritizedMessageItem, but adds it to the front of
those in the same priority.
+ */
+ synchronized void pushfrontPrioritizedMessageItem(MessageItem addMe) {
+ //Assume it goes on the front
+ short prio = addMe.getPriority();
+ queuesByPriority[prio].addFirst(addMe);
+ }
+
+ public synchronized MessageItem[] grabQueuedMessageItems() {
+ int size = 0;
+ for(int i=0;i<queuesByPriority.length;i++)
+ size += queuesByPriority[i].size();
+ MessageItem[] output = new MessageItem[size];
+ int ptr = 0;
+ for(int i=0;i<queuesByPriority.length;i++) {
+ for(Object item : queuesByPriority[i])
+ output[ptr++] = (MessageItem) item;
+ }
+ return output;
+ }
+
+ public long getNextUrgentTime(long t, long now) {
+ for(LinkedList list : queuesByPriority) {
+ if(list.isEmpty()) continue;
+ MessageItem item = (MessageItem) list.getFirst();
+ if(item.submitted + PacketSender.MAX_COALESCING_DELAY <
now && Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Message queued to send
immediately");
+ t = Math.min(t, item.submitted +
PacketSender.MAX_COALESCING_DELAY);
+ }
+ return t;
+ }
+
+ public boolean mustSendNow(long now) {
+ for(int i=0;i<queuesByPriority.length;i++) {
+ if(!queuesByPriority[i].isEmpty()) {
+ if(((MessageItem)
queuesByPriority[i].getFirst()).submitted +
+
PacketSender.MAX_COALESCING_DELAY <= now) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public boolean mustSendSize(int minSize, int maxSize) {
+ int length = minSize;
+ for(LinkedList items : queuesByPriority) {
+ for(Object o : items) {
+ MessageItem i = (MessageItem) o;
+ if(length + maxSize > maxSize) {
+ return true;
+ } else length += maxSize;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Add urgent messages to the queue.
+ * @param size
+ * @param now
+ * @param minSize
+ * @param maxSize
+ * @param messages
+ * @return The new size of the packet, multiplied by -1 iff there are
more
+ * messages but they don't fit.
+ */
+ public int addUrgentMessages(int size, long now, int minSize, int
maxSize, ArrayList<MessageItem> messages) {
+ boolean gotEnough = false;
+ while(!gotEnough) {
+ // Urgent messages first.
+ boolean foundNothingUrgent = true;
+ for(LinkedList items : queuesByPriority) {
+ while(!gotEnough) {
+ if(items.isEmpty()) {
+ break;
+ }
+ MessageItem item = (MessageItem)
items.getFirst();
+ if(item.submitted +
PacketSender.MAX_COALESCING_DELAY <= now) {
+ foundNothingUrgent = false;
+ int thisSize = item.getLength();
+ if(size + thisSize > maxSize) {
+ if(size == minSize) {
+ // Send it
anyway, nothing else to send.
+ size +=
thisSize;
+
items.removeFirst();
+
messages.add(item);
+ gotEnough =
true;
+ break;
+ }
+ gotEnough = true;
+ break; // More items
won't fit.
+ }
+ size += thisSize;
+ items.removeFirst();
+ messages.add(item);
+ } else {
+ break;
+ }
+ }
+ }
+ if(foundNothingUrgent) break;
+ }
+ if(gotEnough)
+ return -size;
+ else
+ return size;
+ }
+
+ /**
+ * Add non-urgent messages to the queue.
+ * @param size
+ * @param now
+ * @param minSize
+ * @param maxSize
+ * @param messages
+ * @return The new size of the packet, multiplied by -1 iff there are
more
+ * messages but they don't fit.
+ */
+ public int addNonUrgentMessages(int size, long now, int minSize, int
maxSize, ArrayList<MessageItem> messages) {
+ boolean gotEnough = false;
+ while(!gotEnough) {
+ boolean foundNothing = true;
+ for(LinkedList items : queuesByPriority) {
+ while(!gotEnough) {
+ if(items.isEmpty()) {
+ break;
+ }
+ MessageItem item = (MessageItem)
items.getFirst();
+ foundNothing = false;
+ int thisSize = item.getLength();
+ if(size + thisSize > maxSize) {
+ if(size == minSize) {
+ // Send it anyway,
nothing else to send.
+ size += thisSize;
+ items.removeFirst();
+ messages.add(item);
+ gotEnough = true;
+ break;
+ }
+ gotEnough = true;
+ break; // More items won't fit.
+ }
+ size += thisSize;
+ items.removeFirst();
+ messages.add(item);
+ }
+ }
+ if(foundNothing) break;
+ }
+ if(gotEnough)
+ return -size;
+ else
+ return size;
+ }
+
+
+}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-09-23 19:00:00 UTC
(rev 22779)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-09-23 19:08:07 UTC
(rev 22780)
@@ -208,7 +208,7 @@
final PeerManager peers;
/** MessageItem's to send ASAP.
* LOCKING: Lock on self, always take that lock last. Sometimes used
inside PeerNode.this lock. */
- private final LinkedList[] messagesToSendNow;
+ private final PeerMessageQueue messageQueue;
/** When did we last receive a SwapRequest? */
private long timeLastReceivedSwapRequest;
/** Average interval between SwapRequest's */
@@ -589,10 +589,8 @@
// Not connected yet; need to handshake
isConnected = false;
- messagesToSendNow = new LinkedList[DMT.NUM_PRIORITIES];
- for(int i=0;i<messagesToSendNow.length;i++)
- messagesToSendNow[i] = new LinkedList();
-
+ messageQueue = new PeerMessageQueue();
+
decrementHTLAtMaximum = node.random.nextFloat() <
Node.DECREMENT_AT_MAX_PROB;
decrementHTLAtMinimum = node.random.nextFloat() <
Node.DECREMENT_AT_MIN_PROB;
@@ -1035,19 +1033,7 @@
MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[]{cb}, ctr, this);
long now = System.currentTimeMillis();
reportBackoffStatus(now);
- int x = 0;
- synchronized(messagesToSendNow) {
- enqueuePrioritizedMessageItem(item);
- for(int p=0;p<messagesToSendNow.length;p++) {
- Iterator i = messagesToSendNow[p].iterator();
- for(; i.hasNext();) {
- MessageItem it = (MessageItem) (i.next());
- x += it.getLength() + 2;
- if(x > 1024)
- break;
- }
- }
- }
+ int x = messageQueue.queueAndEstimateSize(item);
if(x > 1024 || !node.enablePacketCoalescing) {
// If there is a packet's worth to send, wake up the
packetsender.
node.ps.wakeUp();
@@ -1058,39 +1044,10 @@
}
public long getMessageQueueLengthBytes() {
- long x = 0;
- synchronized(messagesToSendNow) {
- for(int p=0;p<messagesToSendNow.length;p++) {
- Iterator i = messagesToSendNow[p].iterator();
- for(; i.hasNext();) {
- MessageItem it = (MessageItem) (i.next());
- x += it.getLength() + 2;
- }
- }
- }
- return x;
+ return messageQueue.getMessageQueueLengthBytes();
}
- private void enqueuePrioritizedMessageItem(MessageItem addMe) {
- synchronized (messagesToSendNow) {
- //Assume it goes on the end, both the common case
- short prio = addMe.getPriority();
- messagesToSendNow[prio].addLast(addMe);
- }
- }
-
/**
- * like enqueuePrioritizedMessageItem, but adds it to the front of
those in the same priority.
- */
- private void pushfrontPrioritizedMessageItem(MessageItem addMe) {
- synchronized (messagesToSendNow) {
- //Assume it goes on the front
- short prio = addMe.getPriority();
- messagesToSendNow[prio].addFirst(addMe);
- }
- }
-
- /**
* Returns the number of milliseconds that it is estimated to take to
transmit the currently queued packets.
*/
public long getProbableSendQueueTime() {
@@ -1256,18 +1213,7 @@
* Message's.
*/
public MessageItem[] grabQueuedMessageItems() {
- synchronized(messagesToSendNow) {
- int size = 0;
- for(int i=0;i<messagesToSendNow.length;i++)
- size += messagesToSendNow[i].size();
- MessageItem[] output = new MessageItem[size];
- int ptr = 0;
- for(int i=0;i<messagesToSendNow.length;i++) {
- for(Object item : messagesToSendNow[i])
- output[ptr++] = (MessageItem) item;
- }
- return output;
- }
+ return messageQueue.grabQueuedMessageItems();
}
public void requeueMessageItems(MessageItem[] messages, int offset, int
length, boolean dontLog) {
@@ -1296,10 +1242,10 @@
Logger.normal(this, "Requeueing " +
messages.length + " messages" + reasonWrapper + " on " + this +
rateLimitWrapper);
}
}
- synchronized(messagesToSendNow) {
+ synchronized(messageQueue) {
for(int i = offset+length-1; i >= offset; i--)
if(messages[i] != null)
-
pushfrontPrioritizedMessageItem(messages[i]);
+
messageQueue.pushfrontPrioritizedMessageItem(messages[i]);
}
}
@@ -1328,15 +1274,7 @@
if(kt.hasPacketsToResend()) return now;
}
}
- synchronized(messagesToSendNow) {
- for(LinkedList list : messagesToSendNow) {
- if(list.isEmpty()) continue;
- MessageItem item = (MessageItem)
list.getFirst();
- if(item.submitted +
PacketSender.MAX_COALESCING_DELAY < now && logMINOR)
- Logger.minor(this, "Message queued to
send immediately");
- t = Math.min(t, item.submitted +
PacketSender.MAX_COALESCING_DELAY);
- }
- }
+ t = messageQueue.getNextUrgentTime(t, now);
return t;
}
@@ -4140,103 +4078,34 @@
ArrayList<MessageItem> messages = new
ArrayList<MessageItem>(10);
- synchronized(messagesToSendNow) {
+ synchronized(messageQueue) {
// Any urgent messages to send now?
if(!mustSend) {
- for(int i=0;i<messagesToSendNow.length;i++) {
- if(!messagesToSendNow[i].isEmpty()) {
- if(((MessageItem)
messagesToSendNow[i].getFirst()).submitted +
-
PacketSender.MAX_COALESCING_DELAY <= now) {
- mustSend = true;
- break;
- }
- }
- }
+ if(messageQueue.mustSendNow(now))
+ mustSend = true;
}
if(!mustSend) {
// What about total length?
- int length = minSize;
- for(LinkedList items : messagesToSendNow) {
- for(Object o : items) {
- MessageItem i = (MessageItem) o;
- if(length + maxSize > maxSize) {
- mustSend = true;
- break;
- } else length += maxSize;
- }
- }
+ if(messageQueue.mustSendSize(minSize, maxSize))
+ mustSend = true;
}
if(mustSend) {
int size = minSize;
boolean gotEnough = false;
- while(!gotEnough) {
- // Urgent messages first.
- boolean foundNothingUrgent = true;
- for(LinkedList items :
messagesToSendNow) {
- while(!gotEnough) {
- if(items.isEmpty()) {
- break;
- }
- MessageItem item =
(MessageItem) items.getFirst();
- if(item.submitted +
PacketSender.MAX_COALESCING_DELAY <= now) {
-
foundNothingUrgent = false;
- int thisSize =
item.getLength();
- if(size +
thisSize > maxSize) {
- if(size
== minSize) {
-
// Send it anyway, nothing else to send.
-
size += thisSize;
-
items.removeFirst();
-
messages.add(item);
-
gotEnough = true;
-
break;
- }
-
gotEnough = true;
- break;
// More items won't fit.
- }
- size +=
thisSize;
-
items.removeFirst();
-
messages.add(item);
- } else {
- break;
- }
- }
- }
- if(foundNothingUrgent) break;
+ size = messageQueue.addUrgentMessages(size,
now, minSize, maxSize, messages);
+ if(size < 0) {
+ gotEnough = true;
+ size = -size;
}
-
+
// Now the not-so-urgent messages.
- while(!gotEnough) {
- boolean foundNothing = true;
- for(LinkedList items :
messagesToSendNow) {
- while(!gotEnough) {
- if(items.isEmpty()) {
- break;
- }
- MessageItem item =
(MessageItem) items.getFirst();
- foundNothing = false;
- int thisSize =
item.getLength();
- if(size + thisSize >
maxSize) {
- if(size ==
minSize) {
- // Send
it anyway, nothing else to send.
- size +=
thisSize;
-
items.removeFirst();
-
messages.add(item);
-
gotEnough = true;
- break;
- }
- gotEnough =
true;
- break; // More
items won't fit.
- }
- size += thisSize;
- items.removeFirst();
- messages.add(item);
- }
- }
- if(foundNothing) break;
+ if(!gotEnough) {
+ size =
messageQueue.addNonUrgentMessages(size, now, minSize, maxSize, messages);
+ if(size < 0) size = -size;
}
}