Author: toad
Date: 2008-09-23 19:08:07 +0000 (Tue, 23 Sep 2008)
New Revision: 22780

Added:
   trunk/freenet/src/freenet/node/PeerMessageQueue.java
Modified:
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Split PeerMessageQueue into a separate class.


Added: trunk/freenet/src/freenet/node/PeerMessageQueue.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerMessageQueue.java                        
        (rev 0)
+++ trunk/freenet/src/freenet/node/PeerMessageQueue.java        2008-09-23 
19:08:07 UTC (rev 22780)
@@ -0,0 +1,216 @@
+package freenet.node;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import freenet.io.comm.DMT;
+import freenet.support.Logger;
+
+/**
+ * Queue of messages to send to a node. Ordered first by priority then by time.
+ * Will soon be round-robin between different transfers/UIDs/clients too.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class PeerMessageQueue {
+
+       private final LinkedList[] queuesByPriority;
+       
+       PeerMessageQueue() {
+               queuesByPriority = new LinkedList[DMT.NUM_PRIORITIES];
+               for(int i=0;i<queuesByPriority.length;i++)
+                       queuesByPriority[i] = new LinkedList();
+       }
+
+       public synchronized int queueAndEstimateSize(MessageItem item) {
+               enqueuePrioritizedMessageItem(item);
+               int x = 0;
+               for(int p=0;p<queuesByPriority.length;p++) {
+                       Iterator i = queuesByPriority[p].iterator();
+                       for(; i.hasNext();) {
+                               MessageItem it = (MessageItem) (i.next());
+                               x += it.getLength() + 2;
+                               if(x > 1024)
+                                       break;
+                       }
+               }
+               return x;
+       }
+
+       public synchronized long getMessageQueueLengthBytes() {
+               long x = 0;
+               for(int p=0;p<queuesByPriority.length;p++) {
+                       Iterator i = queuesByPriority[p].iterator();
+                       for(; i.hasNext();) {
+                               MessageItem it = (MessageItem) (i.next());
+                               x += it.getLength() + 2;
+                       }
+               }
+               return x;
+       }
+       
+       private synchronized void enqueuePrioritizedMessageItem(MessageItem 
addMe) {
+               //Assume it goes on the end, both the common case
+               short prio = addMe.getPriority();
+               queuesByPriority[prio].addLast(addMe);
+       }
+       
+       /**
+        * like enqueuePrioritizedMessageItem, but adds it to the front of 
those in the same priority.
+        */
+       synchronized void pushfrontPrioritizedMessageItem(MessageItem addMe) {
+               //Assume it goes on the front
+               short prio = addMe.getPriority();
+               queuesByPriority[prio].addFirst(addMe);
+       }
+
+       public synchronized MessageItem[] grabQueuedMessageItems() {
+               int size = 0;
+               for(int i=0;i<queuesByPriority.length;i++)
+                       size += queuesByPriority[i].size();
+               MessageItem[] output = new MessageItem[size];
+               int ptr = 0;
+               for(int i=0;i<queuesByPriority.length;i++) {
+                       for(Object item : queuesByPriority[i])
+                               output[ptr++] = (MessageItem) item;
+               }
+               return output;
+       }
+
+       public long getNextUrgentTime(long t, long now) {
+               for(LinkedList list : queuesByPriority) {
+                       if(list.isEmpty()) continue;
+                       MessageItem item = (MessageItem) list.getFirst();
+                       if(item.submitted + PacketSender.MAX_COALESCING_DELAY < 
now && Logger.shouldLog(Logger.MINOR, this))
+                               Logger.minor(this, "Message queued to send 
immediately");
+                       t = Math.min(t, item.submitted + 
PacketSender.MAX_COALESCING_DELAY);
+               }
+               return t;
+       }
+
+       public boolean mustSendNow(long now) {
+               for(int i=0;i<queuesByPriority.length;i++) {
+                       if(!queuesByPriority[i].isEmpty()) {
+                               if(((MessageItem) 
queuesByPriority[i].getFirst()).submitted + 
+                                               
PacketSender.MAX_COALESCING_DELAY <= now) {
+                                       return true;
+                               }
+                       }
+               }
+               return false;
+       }
+
+       public boolean mustSendSize(int minSize, int maxSize) {
+               int length = minSize;
+               for(LinkedList items : queuesByPriority) {
+                       for(Object o : items) {
+                               MessageItem i = (MessageItem) o;
+                               if(length + maxSize > maxSize) {
+                                       return true;
+                               } else length += maxSize;
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * Add urgent messages to the queue.
+        * @param size
+        * @param now
+        * @param minSize
+        * @param maxSize
+        * @param messages
+        * @return The new size of the packet, multiplied by -1 iff there are 
more
+        * messages but they don't fit.
+        */
+       public int addUrgentMessages(int size, long now, int minSize, int 
maxSize, ArrayList<MessageItem> messages) {
+               boolean gotEnough = false;
+               while(!gotEnough) {
+                       // Urgent messages first.
+                       boolean foundNothingUrgent = true;
+                       for(LinkedList items : queuesByPriority) {
+                               while(!gotEnough) {
+                                       if(items.isEmpty()) {
+                                               break;
+                                       }
+                                       MessageItem item = (MessageItem) 
items.getFirst();
+                                       if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY <= now) {
+                                               foundNothingUrgent = false;
+                                               int thisSize = item.getLength();
+                                               if(size + thisSize > maxSize) {
+                                                       if(size == minSize) {
+                                                               // Send it 
anyway, nothing else to send.
+                                                               size += 
thisSize;
+                                                               
items.removeFirst();
+                                                               
messages.add(item);
+                                                               gotEnough = 
true;
+                                                               break;
+                                                       }
+                                                       gotEnough = true;
+                                                       break; // More items 
won't fit.
+                                               }
+                                               size += thisSize;
+                                               items.removeFirst();
+                                               messages.add(item);
+                                       } else {
+                                               break;
+                                       }
+                               }
+                       }
+                       if(foundNothingUrgent) break;
+               }
+               if(gotEnough)
+                       return -size;
+               else
+                       return size;
+       }
+
+       /**
+        * Add non-urgent messages to the queue.
+        * @param size
+        * @param now
+        * @param minSize
+        * @param maxSize
+        * @param messages
+        * @return The new size of the packet, multiplied by -1 iff there are 
more
+        * messages but they don't fit.
+        */
+       public int addNonUrgentMessages(int size, long now, int minSize, int 
maxSize, ArrayList<MessageItem> messages) {
+               boolean gotEnough = false;
+               while(!gotEnough) {
+                       boolean foundNothing = true;
+                       for(LinkedList items : queuesByPriority) {
+                               while(!gotEnough) {
+                                       if(items.isEmpty()) {
+                                               break;
+                                       }
+                                       MessageItem item = (MessageItem) 
items.getFirst();
+                                       foundNothing = false;
+                                       int thisSize = item.getLength();
+                                       if(size + thisSize > maxSize) {
+                                               if(size == minSize) {
+                                                       // Send it anyway, 
nothing else to send.
+                                                       size += thisSize;
+                                                       items.removeFirst();
+                                                       messages.add(item);
+                                                       gotEnough = true;
+                                                       break;
+                                               }
+                                               gotEnough = true;
+                                               break; // More items won't fit.
+                                       }
+                                       size += thisSize;
+                                       items.removeFirst();
+                                       messages.add(item);
+                               }
+                       }
+                       if(foundNothing) break;
+               }
+               if(gotEnough)
+                       return -size;
+               else
+                       return size;
+       }       
+       
+       
+}

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-09-23 19:00:00 UTC 
(rev 22779)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-09-23 19:08:07 UTC 
(rev 22780)
@@ -208,7 +208,7 @@
        final PeerManager peers;
        /** MessageItem's to send ASAP. 
         * LOCKING: Lock on self, always take that lock last. Sometimes used 
inside PeerNode.this lock. */
-       private final LinkedList[] messagesToSendNow;
+       private final PeerMessageQueue messageQueue;
        /** When did we last receive a SwapRequest? */
        private long timeLastReceivedSwapRequest;
        /** Average interval between SwapRequest's */
@@ -589,10 +589,8 @@
                // Not connected yet; need to handshake
                isConnected = false;

-               messagesToSendNow = new LinkedList[DMT.NUM_PRIORITIES];
-               for(int i=0;i<messagesToSendNow.length;i++)
-                       messagesToSendNow[i] = new LinkedList();
-
+               messageQueue = new PeerMessageQueue();
+               
                decrementHTLAtMaximum = node.random.nextFloat() < 
Node.DECREMENT_AT_MAX_PROB;
                decrementHTLAtMinimum = node.random.nextFloat() < 
Node.DECREMENT_AT_MIN_PROB;

@@ -1035,19 +1033,7 @@
                MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[]{cb}, ctr, this);
                long now = System.currentTimeMillis();
                reportBackoffStatus(now);
-               int x = 0;
-               synchronized(messagesToSendNow) {
-                       enqueuePrioritizedMessageItem(item);
-                       for(int p=0;p<messagesToSendNow.length;p++) {
-                       Iterator i = messagesToSendNow[p].iterator();
-                       for(; i.hasNext();) {
-                               MessageItem it = (MessageItem) (i.next());
-                               x += it.getLength() + 2;
-                               if(x > 1024)
-                                       break;
-                       }
-                       }
-               }
+               int x = messageQueue.queueAndEstimateSize(item);
                if(x > 1024 || !node.enablePacketCoalescing) {
                        // If there is a packet's worth to send, wake up the 
packetsender.
                        node.ps.wakeUp();
@@ -1058,39 +1044,10 @@
        }

        public long getMessageQueueLengthBytes() {
-               long x = 0;
-               synchronized(messagesToSendNow) {
-                       for(int p=0;p<messagesToSendNow.length;p++) {
-                       Iterator i = messagesToSendNow[p].iterator();
-                       for(; i.hasNext();) {
-                               MessageItem it = (MessageItem) (i.next());
-                               x += it.getLength() + 2;
-                       }
-                       }
-               }
-               return x;
+               return messageQueue.getMessageQueueLengthBytes();
        }

-       private void enqueuePrioritizedMessageItem(MessageItem addMe) {
-               synchronized (messagesToSendNow) {
-                       //Assume it goes on the end, both the common case
-                       short prio = addMe.getPriority();
-                       messagesToSendNow[prio].addLast(addMe);
-               }
-       }
-       
        /**
-        * like enqueuePrioritizedMessageItem, but adds it to the front of 
those in the same priority.
-        */
-       private void pushfrontPrioritizedMessageItem(MessageItem addMe) {
-               synchronized (messagesToSendNow) {
-                       //Assume it goes on the front
-                       short prio = addMe.getPriority();
-                       messagesToSendNow[prio].addFirst(addMe);
-               }
-       }       
-       
-       /**
         * Returns the number of milliseconds that it is estimated to take to 
transmit the currently queued packets.
         */
        public long getProbableSendQueueTime() {
@@ -1256,18 +1213,7 @@
        * Message's.
        */
        public MessageItem[] grabQueuedMessageItems() {
-               synchronized(messagesToSendNow) {
-                       int size = 0;
-                       for(int i=0;i<messagesToSendNow.length;i++)
-                               size += messagesToSendNow[i].size();
-                       MessageItem[] output = new MessageItem[size];
-                       int ptr = 0;
-                       for(int i=0;i<messagesToSendNow.length;i++) {
-                               for(Object item : messagesToSendNow[i])
-                                       output[ptr++] = (MessageItem) item;
-                       }
-                       return output;
-               }
+               return messageQueue.grabQueuedMessageItems();
        }

        public void requeueMessageItems(MessageItem[] messages, int offset, int 
length, boolean dontLog) {
@@ -1296,10 +1242,10 @@
                                Logger.normal(this, "Requeueing " + 
messages.length + " messages" + reasonWrapper + " on " + this + 
rateLimitWrapper);
                        }
                }
-               synchronized(messagesToSendNow) {
+               synchronized(messageQueue) {
                        for(int i = offset+length-1; i >= offset; i--)
                                if(messages[i] != null)
-                                       
pushfrontPrioritizedMessageItem(messages[i]);
+                                       
messageQueue.pushfrontPrioritizedMessageItem(messages[i]);
                }
        }

@@ -1328,15 +1274,7 @@
                        if(kt.hasPacketsToResend()) return now;
                }
                }
-               synchronized(messagesToSendNow) {
-                       for(LinkedList list : messagesToSendNow) {
-                               if(list.isEmpty()) continue;
-                               MessageItem item = (MessageItem) 
list.getFirst();
-                               if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY < now && logMINOR)
-                                       Logger.minor(this, "Message queued to 
send immediately");
-                               t = Math.min(t, item.submitted + 
PacketSender.MAX_COALESCING_DELAY);
-                       }
-               }
+               t = messageQueue.getNextUrgentTime(t, now);
                return t;
        }

@@ -4140,103 +4078,34 @@

                ArrayList<MessageItem> messages = new 
ArrayList<MessageItem>(10);

-               synchronized(messagesToSendNow) {
+               synchronized(messageQueue) {

                        // Any urgent messages to send now?

                        if(!mustSend) {
-                               for(int i=0;i<messagesToSendNow.length;i++) {
-                                       if(!messagesToSendNow[i].isEmpty()) {
-                                               if(((MessageItem) 
messagesToSendNow[i].getFirst()).submitted + 
-                                                               
PacketSender.MAX_COALESCING_DELAY <= now) {
-                                                       mustSend = true;
-                                                       break;
-                                               }
-                                       }
-                               }
+                               if(messageQueue.mustSendNow(now))
+                                       mustSend = true;
                        }

                        if(!mustSend) {
                                // What about total length?
-                               int length = minSize;
-                               for(LinkedList items : messagesToSendNow) {
-                                       for(Object o : items) {
-                                               MessageItem i = (MessageItem) o;
-                                               if(length + maxSize > maxSize) {
-                                                       mustSend = true;
-                                                       break;
-                                               } else length += maxSize;
-                                       }
-                               }
+                               if(messageQueue.mustSendSize(minSize, maxSize))
+                                       mustSend = true;
                        }

                        if(mustSend) {
                                int size = minSize;
                                boolean gotEnough = false;
-                               while(!gotEnough) {
-                                       // Urgent messages first.
-                                       boolean foundNothingUrgent = true;
-                                       for(LinkedList items : 
messagesToSendNow) {
-                                               while(!gotEnough) {
-                                                       if(items.isEmpty()) {
-                                                               break;
-                                                       }
-                                                       MessageItem item = 
(MessageItem) items.getFirst();
-                                                       if(item.submitted + 
PacketSender.MAX_COALESCING_DELAY <= now) {
-                                                               
foundNothingUrgent = false;
-                                                               int thisSize = 
item.getLength();
-                                                               if(size + 
thisSize > maxSize) {
-                                                                       if(size 
== minSize) {
-                                                                               
// Send it anyway, nothing else to send.
-                                                                               
size += thisSize;
-                                                                               
items.removeFirst();
-                                                                               
messages.add(item);
-                                                                               
gotEnough = true;
-                                                                               
break;
-                                                                       }
-                                                                       
gotEnough = true;
-                                                                       break; 
// More items won't fit.
-                                                               }
-                                                               size += 
thisSize;
-                                                               
items.removeFirst();
-                                                               
messages.add(item);
-                                                       } else {
-                                                               break;
-                                                       }
-                                               }
-                                       }
-                                       if(foundNothingUrgent) break;
+                               size = messageQueue.addUrgentMessages(size, 
now, minSize, maxSize, messages);
+                               if(size < 0) {
+                                       gotEnough = true;
+                                       size = -size;
                                }
-                               
+
                                // Now the not-so-urgent messages.
-                               while(!gotEnough) {
-                                       boolean foundNothing = true;
-                                       for(LinkedList items : 
messagesToSendNow) {
-                                               while(!gotEnough) {
-                                                       if(items.isEmpty()) {
-                                                               break;
-                                                       }
-                                                       MessageItem item = 
(MessageItem) items.getFirst();
-                                                       foundNothing = false;
-                                                       int thisSize = 
item.getLength();
-                                                       if(size + thisSize > 
maxSize) {
-                                                               if(size == 
minSize) {
-                                                                       // Send 
it anyway, nothing else to send.
-                                                                       size += 
thisSize;
-                                                                       
items.removeFirst();
-                                                                       
messages.add(item);
-                                                                       
gotEnough = true;
-                                                                       break;
-                                                               }
-                                                               gotEnough = 
true;
-                                                               break; // More 
items won't fit.
-                                                       }
-                                                       size += thisSize;
-                                                       items.removeFirst();
-                                                       messages.add(item);
-                                               }
-                                       }
-                                       if(foundNothing) break;
+                               if(!gotEnough) {
+                                       size = 
messageQueue.addNonUrgentMessages(size, now, minSize, maxSize, messages);
+                                       if(size < 0) size = -size;
                                }
                        }



Reply via email to