Author: nextgens
Date: 2009-04-19 23:23:26 +0000 (Sun, 19 Apr 2009)
New Revision: 27088

Modified:
   branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
The previous attempts to speedup message matching were silly: neither the 
source nor the spec are mandatory in filters (*sigh*). There is no point in 
trying to reduce the size of the _filters list assuming they are.

Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-19 22:32:31 UTC (rev 27087)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-19 23:23:26 UTC (rev 27088)
@@ -23,11 +23,7 @@
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.Vector;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import freenet.node.PeerNode;
 import freenet.node.Ticker;
@@ -52,10 +48,8 @@
        }
 
        private Dispatcher _dispatcher;
-       private final Lock messageFiltersReadLock;
-       private final Lock messageFiltersWriteLock;
-       /** messageFiltersLock serves as lock for both _filters and _unclaimed 
*/
-       private final Map<PeerContext, LinkedList<MessageFilter>> _filters = 
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
+       /** _filters serves as lock for both */
+       private final LinkedList<MessageFilter> _filters = new 
LinkedList<MessageFilter>();
        private final LinkedList<Message> _unclaimed = new 
LinkedList<Message>();
        private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
        private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME = 
10*60*1000;  // 10 minutes; maybe this should be per message type??
@@ -69,9 +63,6 @@
 
        public MessageCore() {
                _timedOutFilters = new Vector<MessageFilter>(32);
-               ReadWriteLock messageFiltersLock = new 
ReentrantReadWriteLock(true);
-               messageFiltersReadLock = messageFiltersLock.readLock();
-               messageFiltersWriteLock = messageFiltersLock.writeLock();
        }
 
        /**
@@ -123,33 +114,21 @@
                // Avoids exhaustive and unsuccessful search in waitFor() 
removal of a timed out filter.
                if(logMINOR)
                        Logger.minor(this, "Removing timed out filters");
-
-               try {
-                       messageFiltersWriteLock.lock();
-                       for(LinkedList<MessageFilter> mfl : _filters.values()) {
-                               for (ListIterator<MessageFilter> i = 
mfl.listIterator(); i.hasNext();) {
-                                       MessageFilter f = i.next();
-                                       if (f.timedOut(tStart)) {
-                                               if(logMINOR)
-                                                       Logger.minor(this, 
"Removing "+f);
-                                               try {
-                                                       
messageFiltersWriteLock.lock();
-                                                       i.remove();
-                                               } finally {
-                                                       
messageFiltersWriteLock.unlock();
-                                               }
-                                               _timedOutFilters.add(f);
-                                       }
-                                       // Do not break after finding a 
non-timed-out filter because some filters may 
-                                       // be timed out because their client 
callbacks say they should be.
-                                       // Also simplifies the logic 
significantly, we've had some major bugs here.
-
-                                       // See also the end of waitFor() for 
another weird case.
-
+               synchronized (_filters) {
+                       for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
+                               MessageFilter f = i.next();
+                               if (f.timedOut(tStart)) {
+                                       if(logMINOR)
+                                               Logger.minor(this, "Removing 
"+f);
+                                       i.remove();
+                                       _timedOutFilters.add(f);
                                }
+                               // Do not break after finding a non-timed-out 
filter because some filters may 
+                               // be timed out because their client callbacks 
say they should be.
+                               // Also simplifies the logic significantly, 
we've had some major bugs here.
+                               
+                               // See also the end of waitFor() for another 
weird case.
                        }
-               } finally {
-                       messageFiltersWriteLock.unlock();
                }
                
                for(MessageFilter f : _timedOutFilters) {
@@ -180,42 +159,27 @@
                        
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
                }
                boolean matched = false;
-               if(logMINOR) {
                if (!(m.getSpec().equals(DMT.packetTransmit))) {
-                       Logger.minor(this, "" + (System.currentTimeMillis() % 
60000) + ' ' + from + " <- "
+                       if(logMINOR) Logger.minor(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
                                        + m.getSource() + " : " + m);
                }
-               }
                MessageFilter match = null;
-               try {
-                       messageFiltersReadLock.lock();
-                       LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
-                       if(list != null) {
-                               for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
-                                       MessageFilter f = i.next();
-                                       if (f.match(m, tStart)) {
-                                               try {
-                                                       
messageFiltersReadLock.unlock();
-                                                       
messageFiltersWriteLock.lock();
-                                                       if (f.match(m, tStart)) 
{
-                                                               if 
(f.matched()) {
-                                                                       
Logger.error(this, "removed pre-matched message filter found in _filters: "+f);
-                                                               } else {
-                                                                       matched 
= true;
-                                                                       match = 
f;
-                                                                       
if(logMINOR) Logger.minor(this, "Matched: "+f);
-                                                               }
-                                                               i.remove();
-                                                       }
-                                                       break; // Only one 
match permitted per message
-                                               } finally {
-                                                       
messageFiltersWriteLock.unlock();
-                                               }
-                                       }
+               synchronized (_filters) {
+                       for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
+                               MessageFilter f = i.next();
+                               if (f.matched()) {
+                                       Logger.error(this, "removed pre-matched 
message filter found in _filters: "+f);
+                                       i.remove();
+                                       continue;
                                }
+                               if (f.match(m, tStart)) {
+                                       matched = true;
+                                       i.remove();
+                                       match = f;
+                                       if(logMINOR) Logger.minor(this, 
"Matched: "+f);
+                                       break; // Only one match permitted per 
message
+                               }
                        }
-               } finally {
-                       messageFiltersReadLock.unlock();
                }
                if(match != null) {
                        match.setMessage(m);
@@ -254,42 +218,21 @@
                     * Another race is possible if we merely recheck the
                     * filters after we return from dispatcher, for example.
                     */
-                       try {
-                               messageFiltersReadLock.lock();
+                       synchronized (_filters) {
                                if(logMINOR) Logger.minor(this, "Rechecking 
filters and adding message");
-                               LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
-                               if(list != null) {
-                                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
-                                               MessageFilter f = i.next();
-                                               if (f.match(m, tStart)) {
-                                                       try {
-                                                               
messageFiltersReadLock.unlock();
-                                                               
messageFiltersWriteLock.lock();
-                                                               if (f.match(m, 
tStart)) {
-                                                                       matched 
= true;
-                                                                       match = 
f;
-                                                                       
i.remove();
-                                                                       
if(logMINOR) Logger.minor(this, "Matched: "+f);
-                                                                       break; 
// Only one match permitted per message
-                                                               }
-                                                       } finally {
-                                                               
messageFiltersWriteLock.unlock();
-                                                       }
-                                               }
+                               for (ListIterator<MessageFilter> i = 
_filters.listIterator(); i.hasNext();) {
+                                       MessageFilter f = i.next();
+                                       if (f.match(m, tStart)) {
+                                               matched = true;
+                                               match = f;
+                                               i.remove();
+                                               if(logMINOR) Logger.minor(this, 
"Matched: "+f);
+                                               break; // Only one match 
permitted per message
                                        }
                                }
                                if(!matched) {
                                    while (_unclaimed.size() > 
MAX_UNMATCHED_FIFO_SIZE) {
-                                       Message removed = null;
-                                               try {
-                                                       
messageFiltersReadLock.unlock();
-                                                       
messageFiltersWriteLock.lock();
-                                                       if(_unclaimed.size() > 
MAX_UNMATCHED_FIFO_SIZE)
-                                                               
_unclaimed.removeFirst();
-                                               } finally {
-                                                       
messageFiltersWriteLock.unlock();
-                                               }
-                                               if(removed == null) continue;
+                                       Message removed = 
_unclaimed.removeFirst();
                                        long messageLifeTime = 
System.currentTimeMillis() - removed.localInstantiationTime;
                                        if ((removed.getSource()) instanceof 
PeerNode) {
                                            Logger.normal(this, "Dropping 
unclaimed from "+removed.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
@@ -297,17 +240,9 @@
                                            Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" 
(quantity)"+": "+removed);
                                        }
                                    }
-                                       try {
-                                               messageFiltersReadLock.unlock();
-                                               messageFiltersWriteLock.lock();
-                                               _unclaimed.addLast(m);
-                                       } finally {
-                                               
messageFiltersWriteLock.unlock();
-                                       }
+                                   _unclaimed.addLast(m);
                                    if(logMINOR) Logger.minor(this, "Done");
                                }
-                       } finally {
-                               messageFiltersReadLock.unlock();
                        }
                        if(match != null) {
                                match.setMessage(m);
@@ -315,38 +250,29 @@
                        }
                }
                long tEnd = System.currentTimeMillis();
-               long dT = tEnd - tStart; 
-               if(dT > 50) {
-                       if(dT > 3000)
-                               Logger.error(this, "checkFilters took 
"+(dT)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for matched: 
"+matched);
+               if(tEnd - tStart > 50) {
+                       if(tEnd - tStart > 3000)
+                               Logger.error(this, "checkFilters took 
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
                        else
-                               if(logMINOR) Logger.minor(this, "checkFilters 
took "+(dT)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for matched: 
"+matched);
+                               if(logMINOR) Logger.minor(this, "checkFilters 
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
                }
        }
        
        /** IncomingPacketFilter should call this when a node is disconnected. 
*/
        public void onDisconnect(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
-               try {
-                       messageFiltersWriteLock.lock();
-                       LinkedList<MessageFilter> list = _filters.get(ctx);
-                       if(list == null)
-                               return;
-                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
-                               MessageFilter f = i.next();
-                               if(f.matchesDroppedConnection(ctx)) {
-                                       if(droppedFilters == null)
-                                               droppedFilters = new 
ArrayList<MessageFilter>();
-                                       droppedFilters.add(f);
-                                       i.remove();
-                               }
+           synchronized(_filters) {
+                       ListIterator<MessageFilter> i = _filters.listIterator();
+                       while (i.hasNext()) {
+                           MessageFilter f = i.next();
+                           if(f.matchesDroppedConnection(ctx)) {
+                               if(droppedFilters == null)
+                                       droppedFilters = new 
ArrayList<MessageFilter>();
+                               droppedFilters.add(f);
+                               i.remove();
+                           }
                        }
-                       if(!list.isEmpty())
-                               Logger.error(this, "RACE: how come there is 
still something there? "+list.size()+" filter remaining for "+ctx);
-                       _filters.remove(ctx);
-               } finally {
-                       messageFiltersWriteLock.unlock();
-               }
+           }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
                        mf.onDroppedConnection(ctx);
@@ -357,25 +283,17 @@
        /** IncomingPacketFilter should call this when a node connects with a 
new boot ID */
        public void onRestart(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
-           try {
-               messageFiltersWriteLock.lock();
-                       LinkedList<MessageFilter> list = _filters.get(ctx);
-                       if(list == null)
-                               return;
-                       for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
-                               MessageFilter f = i.next();
-                               if(f.matchesRestartedConnection(ctx)) {
-                                       if(droppedFilters == null)
-                                               droppedFilters = new 
ArrayList<MessageFilter>();
-                                       droppedFilters.add(f);
-                                       i.remove();
-                               }
+           synchronized(_filters) {
+                       ListIterator<MessageFilter> i = _filters.listIterator();
+                       while (i.hasNext()) {
+                           MessageFilter f = i.next();
+                           if(f.matchesRestartedConnection(ctx)) {
+                               if(droppedFilters == null)
+                                       droppedFilters = new 
ArrayList<MessageFilter>();
+                               droppedFilters.add(f);
+                               i.remove();
+                           }
                        }
-                       if(!list.isEmpty())
-                               Logger.error(this, "RACE: how come there is 
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
-                       _filters.remove(ctx);
-           } finally {
-               messageFiltersWriteLock.unlock();
            }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
@@ -402,8 +320,7 @@
                long now = System.currentTimeMillis();
                long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
                long messageLifeTime = 0;
-               try {
-                       messageFiltersWriteLock.lock();
+               synchronized (_filters) {
                        //Once in the list, it is up to the callback system to 
trigger the disconnection, however, we may
                        //have disconnected between check above and locking, so 
we *must* check again.
                        if(filter.anyConnectionsDropped()) {
@@ -432,26 +349,22 @@
                        if (ret == null) {
                                if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
                            // Insert filter into filter list in order of 
timeout
-                               for(LinkedList<MessageFilter> list : 
_filters.values()) {
-                                       ListIterator<MessageFilter> i = 
list.listIterator();
-                                       while (true) {
-                                               if (!i.hasNext()) {
-                                                       i.add(filter);
-                                                       if(logMINOR) 
Logger.minor(this, "Added at end");
-                                                       break;
-                                               }
-                                               MessageFilter mf = i.next();
-                                               if (mf.getTimeout() > 
filter.getTimeout()) {
-                                                       i.previous();
-                                                       i.add(filter);
-                                                       if(logMINOR) 
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                                       break;
-                                               }
+                               ListIterator<MessageFilter> i = 
_filters.listIterator();
+                               while (true) {
+                                       if (!i.hasNext()) {
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added at end");
+                                               break;
                                        }
+                                       MessageFilter mf = i.next();
+                                       if (mf.getTimeout() > 
filter.getTimeout()) {
+                                               i.previous();
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                               break;
+                                       }
                                }
                        }
-               } finally {
-                       messageFiltersWriteLock.unlock();
                }
                if(ret != null) {
                        filter.setMessage(ret);
@@ -486,33 +399,17 @@
                long now = System.currentTimeMillis();
                long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
                long messageLifeTime = 0;
-               int unclaimedSize = 0;
-               try {
-                       messageFiltersReadLock.lock();
+               synchronized (_filters) {
                        if(logMINOR) Logger.minor(this, "Checking _unclaimed");
                        for (ListIterator<Message> i = 
_unclaimed.listIterator(); i.hasNext();) {
                                Message m = i.next();
-                               if (filter.match(m,now)) {
-                                       try {
-                                               messageFiltersReadLock.unlock();
-                                               messageFiltersWriteLock.lock();
-                                               if(filter.match(m,now)) {
-                                                       i.remove();
-                                               }
-                                       } finally {
-                                               
messageFiltersWriteLock.unlock();
-                                       }
+                               if (filter.match(m, now)) {
+                                       i.remove();
                                        ret = m;
                                        if(logMINOR) Logger.minor(this, 
"Matching from _unclaimed");
                                        break;
                                } else if (m.localInstantiationTime < 
messageDropTime) {
-                                       try {
-                                               messageFiltersReadLock.unlock();
-                                               messageFiltersWriteLock.lock();
-                                               i.remove();
-                                       } finally {
-                                               
messageFiltersWriteLock.unlock();
-                                       }
+                                       i.remove();
                                        messageLifeTime = now - 
m.localInstantiationTime;
                                        if ((m.getSource()) instanceof 
PeerNode) {
                                                Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
@@ -524,48 +421,29 @@
                        if (ret == null) {
                                if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
                            // Insert filter into filter list in order of 
timeout
-                               for(LinkedList<MessageFilter> list : 
_filters.values()) {
-                                       ListIterator<MessageFilter> i = 
list.listIterator();
-                                       while (true) {
-                                               if (!i.hasNext()) {
-                                                       try {
-                                                               
messageFiltersReadLock.unlock();
-                                                               
messageFiltersWriteLock.lock();
-                                                               i.add(filter);
-                                                       } finally {
-                                                               
messageFiltersWriteLock.unlock();
-                                                       }
-                                                       if(logMINOR) 
Logger.minor(this, "Added at end");
-                                                       break;
-                                               }
-                                               MessageFilter mf = i.next();
-                                               if (mf.getTimeout() > 
filter.getTimeout()) {
-                                                       try {
-                                                               
messageFiltersReadLock.unlock();
-                                                               
messageFiltersWriteLock.lock();
-                                                               if 
(mf.getTimeout() > filter.getTimeout()) {
-                                                                       
i.previous();
-                                                                       
i.add(filter);
-                                                               }
-                                                       } finally {
-                                                               
messageFiltersWriteLock.unlock();
-                                                       }
-                                                       if(logMINOR) 
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                                       break;
-                                               }
+                               ListIterator<MessageFilter> i = 
_filters.listIterator();
+                               while (true) {
+                                       if (!i.hasNext()) {
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added at end");
+                                               break;
                                        }
+                                       MessageFilter mf = i.next();
+                                       if (mf.getTimeout() > 
filter.getTimeout()) {
+                                               i.previous();
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                               break;
+                                       }
                                }
                        }
-               } finally {
-                       unclaimedSize = _unclaimed.size();
-                       messageFiltersReadLock.unlock();
                }
                long tEnd = System.currentTimeMillis();
                if(tEnd - now > 50) {
                        if(tEnd - now > 3000)
-                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+unclaimedSize+" for 
ret of "+ret);
+                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" 
for ret of "+ret);
                        else
-                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+unclaimedSize+" for ret of "+ret);
+                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+_unclaimed.size()+" for ret of "+ret);
                }
                // Unlock to wait on filter
                // Waiting on the filter won't release the outer lock
@@ -594,17 +472,11 @@
                if(!filter.matched()) {
                        // We must remove it from _filters before we return, or 
when it is re-added,
                        // it will be in the list twice, and potentially many 
more times than twice!
-                       try {
+                       synchronized(_filters) {
                                // Fortunately, it will be close to the 
beginning of the filters list, having
                                // just timed out. That is assuming it hasn't 
already been removed; in that
                                // case, this will be slower.
-                               messageFiltersReadLock.unlock();
-                               messageFiltersWriteLock.lock();
-                               if(!filter.matched()) {
-                                       _filters.remove(filter);
-                               }
-                       } finally {
-                               messageFiltersWriteLock.unlock();
+                               _filters.remove(filter);
                        }
                }
                // Matched a packet, unclaimed or after wait
@@ -645,18 +517,14 @@
         * @return the number of received messages that are currently unclaimed
         */
        public int getUnclaimedFIFOSize() {
-               try {
-                       messageFiltersReadLock.lock();
+               synchronized (_filters){
                        return _unclaimed.size();
-               } finally {
-                       messageFiltersReadLock.unlock();
                }
        }
        
        public Map<String, Integer> getUnclaimedFIFOMessageCounts() {
                Map<String, Integer> messageCounts = new HashMap<String, 
Integer>();
-               try {
-                       messageFiltersReadLock.lock();
+               synchronized(_filters) {
                        for (ListIterator<Message> i = 
_unclaimed.listIterator(); i.hasNext();) {
                                Message m = i.next();
                                String messageName = m.getSpec().getName();
@@ -668,8 +536,6 @@
                                        messageCounts.put(messageName, 
messageCount );
                                }
                        }
-               } finally {
-                       messageFiltersReadLock.unlock();
                }
                return messageCounts;
        }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to