Author: nextgens
Date: 2009-04-17 16:42:03 +0000 (Fri, 17 Apr 2009)
New Revision: 26942

Modified:
   branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
More optimization: sort the MessageFilters by Peer to speedup matching

Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-17 16:32:59 UTC (rev 26941)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-17 16:42:03 UTC (rev 26942)
@@ -23,6 +23,7 @@
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.Vector;
 
 import freenet.node.PeerNode;
@@ -49,7 +50,7 @@
 
        private Dispatcher _dispatcher;
        /** _filters serves as lock for both */
-       private final LinkedList<MessageFilter> _filters = new 
LinkedList<MessageFilter>();
+       private final Map<PeerContext, LinkedList<MessageFilter>> _filters = 
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
        private final LinkedList<Message> _unclaimed = new 
LinkedList<Message>();
        private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
        private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME = 
10*60*1000;  // 10 minutes; maybe this should be per message type??
@@ -115,19 +116,22 @@
                if(logMINOR)
                        Logger.minor(this, "Removing timed out filters");
                synchronized (_filters) {
-                       for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
-                               MessageFilter f = i.next();
-                               if (f.timedOut(tStart)) {
-                                       if(logMINOR)
-                                               Logger.minor(this, "Removing 
"+f);
-                                       i.remove();
-                                       _timedOutFilters.add(f);
+                       for(LinkedList<MessageFilter> mfl : _filters.values()) {
+                               for (ListIterator<MessageFilter> i = 
mfl.listIterator(); i.hasNext();) {
+                                       MessageFilter f = i.next();
+                                       if (f.timedOut(tStart)) {
+                                               if(logMINOR)
+                                                       Logger.minor(this, 
"Removing "+f);
+                                               i.remove();
+                                               _timedOutFilters.add(f);
+                                       }
+                                       // Do not break after finding a 
non-timed-out filter because some filters may 
+                                       // be timed out because their client 
callbacks say they should be.
+                                       // Also simplifies the logic 
significantly, we've had some major bugs here.
+
+                                       // See also the end of waitFor() for 
another weird case.
+
                                }
-                               // Do not break after finding a non-timed-out 
filter because some filters may 
-                               // be timed out because their client callbacks 
say they should be.
-                               // Also simplifies the logic significantly, 
we've had some major bugs here.
-                               
-                               // See also the end of waitFor() for another 
weird case.
                        }
                }
                
@@ -167,20 +171,24 @@
                }
                MessageFilter match = null;
                synchronized (_filters) {
-                       for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
-                               MessageFilter f = i.next();
-                               if (f.matched()) {
-                                       Logger.error(this, "removed pre-matched 
message filter found in _filters: "+f);
-                                       i.remove();
-                                       continue;
+                       LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
+                       if(list != null) {
+                               for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
+                                       MessageFilter f = i.next();
+                                       if (f.matched()) {
+                                               Logger.error(this, "removed 
pre-matched message filter found in _filters: "+f);
+                                               i.remove();
+                                               continue;
+                                       }
+                                       if (f.match(m, tStart)) {
+                                               matched = true;
+                                               i.remove();
+                                               match = f;
+                                               if(logMINOR) Logger.minor(this, 
"Matched: "+f);
+                                               break; // Only one match 
permitted per message
+                                       }
                                }
-                               if (f.match(m, tStart)) {
-                                       matched = true;
-                                       i.remove();
-                                       match = f;
-                                       if(logMINOR) Logger.minor(this, 
"Matched: "+f);
-                                       break; // Only one match permitted per 
message
-                               }
+
                        }
                }
                if(match != null) {
@@ -222,14 +230,17 @@
                     */
                        synchronized (_filters) {
                                if(logMINOR) Logger.minor(this, "Rechecking 
filters and adding message");
-                               for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
-                                       MessageFilter f = i.next();
-                                       if (f.match(m)) {
-                                               matched = true;
-                                               match = f;
-                                               i.remove();
-                                               if(logMINOR) Logger.minor(this, 
"Matched: "+f);
-                                               break; // Only one match 
permitted per message
+                               LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
+                               if(list != null) {
+                                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
+                                               MessageFilter f = i.next();
+                                               if (f.match(m, tStart)) {
+                                                       matched = true;
+                                                       match = f;
+                                                       i.remove();
+                                                       if(logMINOR) 
Logger.minor(this, "Matched: "+f);
+                                                       break; // Only one 
match permitted per message
+                                               }
                                        }
                                }
                                if(!matched) {
@@ -264,18 +275,23 @@
        /** IncomingPacketFilter should call this when a node is disconnected. 
*/
        public void onDisconnect(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
-           synchronized(_filters) {
-                       ListIterator<MessageFilter> i = _filters.listIterator();
-                       while (i.hasNext()) {
-                           MessageFilter f = i.next();
-                           if(f.matchesDroppedConnection(ctx)) {
-                               if(droppedFilters == null)
-                                       droppedFilters = new 
ArrayList<MessageFilter>();
-                               droppedFilters.add(f);
-                               i.remove();
-                           }
+               synchronized(_filters) {
+                       LinkedList<MessageFilter> list = _filters.get(ctx);
+                       if(list == null)
+                               return;
+                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
+                               MessageFilter f = i.next();
+                               if(f.matchesDroppedConnection(ctx)) {
+                                       if(droppedFilters == null)
+                                               droppedFilters = new 
ArrayList<MessageFilter>();
+                                       droppedFilters.add(f);
+                                       i.remove();
+                               }
                        }
-           }
+                       if(!list.isEmpty())
+                               Logger.error(this, "RACE: how come there is 
still something there? "+list.size()+" filter remaining for "+ctx);
+                       _filters.remove(ctx);
+               }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
                        mf.onDroppedConnection(ctx);
@@ -287,16 +303,21 @@
        public void onRestart(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
            synchronized(_filters) {
-                       ListIterator<MessageFilter> i = _filters.listIterator();
-                       while (i.hasNext()) {
-                           MessageFilter f = i.next();
-                           if(f.matchesRestartedConnection(ctx)) {
-                               if(droppedFilters == null)
-                                       droppedFilters = new 
ArrayList<MessageFilter>();
-                               droppedFilters.add(f);
-                               i.remove();
-                           }
+                       LinkedList<MessageFilter> list = _filters.get(ctx);
+                       if(list == null)
+                               return;
+                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
+                               MessageFilter f = i.next();
+                               if(f.matchesRestartedConnection(ctx)) {
+                                       if(droppedFilters == null)
+                                               droppedFilters = new 
ArrayList<MessageFilter>();
+                                       droppedFilters.add(f);
+                                       i.remove();
+                               }
                        }
+                       if(!list.isEmpty())
+                               Logger.error(this, "RACE: how come there is 
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
+                       _filters.remove(ctx);
            }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
@@ -352,20 +373,22 @@
                        if (ret == null) {
                                if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
                            // Insert filter into filter list in order of 
timeout
-                               ListIterator<MessageFilter> i = 
_filters.listIterator();
-                               while (true) {
-                                       if (!i.hasNext()) {
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added at end");
-                                               break;
+                               for(LinkedList<MessageFilter> list : 
_filters.values()) {
+                                       ListIterator<MessageFilter> i = 
list.listIterator();
+                                       while (true) {
+                                               if (!i.hasNext()) {
+                                                       i.add(filter);
+                                                       if(logMINOR) 
Logger.minor(this, "Added at end");
+                                                       break;
+                                               }
+                                               MessageFilter mf = i.next();
+                                               if (mf.getTimeout() > 
filter.getTimeout()) {
+                                                       i.previous();
+                                                       i.add(filter);
+                                                       if(logMINOR) 
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                                       break;
+                                               }
                                        }
-                                       MessageFilter mf = i.next();
-                                       if (mf.getTimeout() > 
filter.getTimeout()) {
-                                               i.previous();
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                               break;
-                                       }
                                }
                        }
                }
@@ -424,20 +447,22 @@
                        if (ret == null) {
                                if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
                            // Insert filter into filter list in order of 
timeout
-                               ListIterator<MessageFilter> i = 
_filters.listIterator();
-                               while (true) {
-                                       if (!i.hasNext()) {
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added at end");
-                                               break;
+                               for(LinkedList<MessageFilter> list : 
_filters.values()) {
+                                       ListIterator<MessageFilter> i = 
list.listIterator();
+                                       while (true) {
+                                               if (!i.hasNext()) {
+                                                       i.add(filter);
+                                                       if(logMINOR) 
Logger.minor(this, "Added at end");
+                                                       break;
+                                               }
+                                               MessageFilter mf = i.next();
+                                               if (mf.getTimeout() > 
filter.getTimeout()) {
+                                                       i.previous();
+                                                       i.add(filter);
+                                                       if(logMINOR) 
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                                       break;
+                                               }
                                        }
-                                       MessageFilter mf = i.next();
-                                       if (mf.getTimeout() > 
filter.getTimeout()) {
-                                               i.previous();
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                               break;
-                                       }
                                }
                        }
                }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to