Author: nextgens
Date: 2009-04-17 17:12:49 +0000 (Fri, 17 Apr 2009)
New Revision: 26946

Modified:
   branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
Reduce locking on MessageCore.

Needs to be tested ^-^

Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-17 16:58:16 UTC (rev 26945)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java       
2009-04-17 17:12:49 UTC (rev 26946)
@@ -25,6 +25,9 @@
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import freenet.node.PeerNode;
 import freenet.node.Ticker;
@@ -49,7 +52,10 @@
        }
 
        private Dispatcher _dispatcher;
-       /** _filters serves as lock for both */
+       /** lock serves as lock for both */
+       private final ReadWriteLock messageFiltersLock = new 
ReentrantReadWriteLock();
+       private final Lock messageFiltersReadLock = 
messageFiltersLock.readLock();
+       private final Lock messageFiltersWriteLock = 
messageFiltersLock.writeLock();
        private final Map<PeerContext, LinkedList<MessageFilter>> _filters = 
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
        private final LinkedList<Message> _unclaimed = new 
LinkedList<Message>();
        private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
@@ -115,14 +121,21 @@
                // Avoids exhaustive and unsuccessful search in waitFor() 
removal of a timed out filter.
                if(logMINOR)
                        Logger.minor(this, "Removing timed out filters");
-               synchronized (_filters) {
+
+               try {
+                       messageFiltersWriteLock.lock();
                        for(LinkedList<MessageFilter> mfl : _filters.values()) {
                                for (ListIterator<MessageFilter> i = 
mfl.listIterator(); i.hasNext();) {
                                        MessageFilter f = i.next();
                                        if (f.timedOut(tStart)) {
                                                if(logMINOR)
                                                        Logger.minor(this, 
"Removing "+f);
-                                               i.remove();
+                                               try {
+                                                       
messageFiltersWriteLock.lock();
+                                                       i.remove();
+                                               } finally {
+                                                       
messageFiltersWriteLock.unlock();
+                                               }
                                                _timedOutFilters.add(f);
                                        }
                                        // Do not break after finding a 
non-timed-out filter because some filters may 
@@ -133,6 +146,8 @@
 
                                }
                        }
+               } finally {
+                       messageFiltersWriteLock.unlock();
                }
                
                for(MessageFilter f : _timedOutFilters) {
@@ -170,19 +185,30 @@
                }
                }
                MessageFilter match = null;
-               synchronized (_filters) {
+               try {
+                       messageFiltersReadLock.lock();
                        LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
                        if(list != null) {
                                for (ListIterator<MessageFilter> i = 
list.listIterator(); i.hasNext();) {
                                        MessageFilter f = i.next();
                                        if (f.matched()) {
                                                Logger.error(this, "removed 
pre-matched message filter found in _filters: "+f);
-                                               i.remove();
+                                               try {
+                                                       
messageFiltersWriteLock.lock();
+                                                       i.remove();
+                                               } finally {
+                                                       
messageFiltersWriteLock.unlock();
+                                               }
                                                continue;
                                        }
                                        if (f.match(m, tStart)) {
                                                matched = true;
-                                               i.remove();
+                                               try {
+                                                       
messageFiltersWriteLock.lock();
+                                                       i.remove();
+                                               } finally {
+                                                       
messageFiltersWriteLock.unlock();
+                                               }
                                                match = f;
                                                if(logMINOR) Logger.minor(this, 
"Matched: "+f);
                                                break; // Only one match 
permitted per message
@@ -190,6 +216,8 @@
                                }
 
                        }
+               } finally {
+                       messageFiltersReadLock.unlock();
                }
                if(match != null) {
                        match.setMessage(m);
@@ -228,7 +256,8 @@
                     * Another race is possible if we merely recheck the
                     * filters after we return from dispatcher, for example.
                     */
-                       synchronized (_filters) {
+                       try {
+                               messageFiltersReadLock.lock();
                                if(logMINOR) Logger.minor(this, "Rechecking 
filters and adding message");
                                LinkedList<MessageFilter> list = 
_filters.get(m.getSource());
                                if(list != null) {
@@ -237,7 +266,12 @@
                                                if (f.match(m, tStart)) {
                                                        matched = true;
                                                        match = f;
-                                                       i.remove();
+                                                       try {
+                                                               
messageFiltersWriteLock.lock();
+                                                               i.remove();
+                                                       } finally {
+                                                               
messageFiltersWriteLock.unlock();
+                                                       }
                                                        if(logMINOR) 
Logger.minor(this, "Matched: "+f);
                                                        break; // Only one 
match permitted per message
                                                }
@@ -245,7 +279,14 @@
                                }
                                if(!matched) {
                                    while (_unclaimed.size() > 
MAX_UNMATCHED_FIFO_SIZE) {
-                                       Message removed = 
_unclaimed.removeFirst();
+                                       Message removed = null;
+                                               try {
+                                                       
messageFiltersWriteLock.lock();
+                                                       
_unclaimed.removeFirst();
+                                               } finally {
+                                                       
messageFiltersWriteLock.unlock();
+                                               }
+                                               if(removed == null) continue;
                                        long messageLifeTime = 
System.currentTimeMillis() - removed.localInstantiationTime;
                                        if ((removed.getSource()) instanceof 
PeerNode) {
                                            Logger.normal(this, "Dropping 
unclaimed from "+removed.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
@@ -253,9 +294,16 @@
                                            Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" 
(quantity)"+": "+removed);
                                        }
                                    }
-                                   _unclaimed.addLast(m);
+                                       try {
+                                               messageFiltersWriteLock.lock();
+                                               _unclaimed.addLast(m);
+                                       } finally {
+                                               
messageFiltersWriteLock.unlock();
+                                       }
                                    if(logMINOR) Logger.minor(this, "Done");
                                }
+                       } finally {
+                               messageFiltersReadLock.unlock();
                        }
                        if(match != null) {
                                match.setMessage(m);
@@ -275,7 +323,8 @@
        /** IncomingPacketFilter should call this when a node is disconnected. 
*/
        public void onDisconnect(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
-               synchronized(_filters) {
+               try {
+                       messageFiltersWriteLock.lock();
                        LinkedList<MessageFilter> list = _filters.get(ctx);
                        if(list == null)
                                return;
@@ -291,6 +340,8 @@
                        if(!list.isEmpty())
                                Logger.error(this, "RACE: how come there is 
still something there? "+list.size()+" filter remaining for "+ctx);
                        _filters.remove(ctx);
+               } finally {
+                       messageFiltersWriteLock.unlock();
                }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
@@ -302,7 +353,8 @@
        /** IncomingPacketFilter should call this when a node connects with a 
new boot ID */
        public void onRestart(PeerContext ctx) {
                ArrayList<MessageFilter> droppedFilters = null; // rare 
operation, we can waste objects for better locking
-           synchronized(_filters) {
+           try {
+               messageFiltersWriteLock.lock();
                        LinkedList<MessageFilter> list = _filters.get(ctx);
                        if(list == null)
                                return;
@@ -318,6 +370,8 @@
                        if(!list.isEmpty())
                                Logger.error(this, "RACE: how come there is 
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
                        _filters.remove(ctx);
+           } finally {
+               messageFiltersWriteLock.unlock();
            }
            if(droppedFilters != null) {
                for(MessageFilter mf : droppedFilters) {
@@ -344,7 +398,8 @@
                long now = System.currentTimeMillis();
                long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
                long messageLifeTime = 0;
-               synchronized (_filters) {
+               try {
+                       messageFiltersWriteLock.lock();
                        //Once in the list, it is up to the callback system to 
trigger the disconnection, however, we may
                        //have disconnected between check above and locking, so 
we *must* check again.
                        if(filter.anyConnectionsDropped()) {
@@ -391,6 +446,8 @@
                                        }
                                }
                        }
+               } finally {
+                       messageFiltersWriteLock.unlock();
                }
                if(ret != null) {
                        filter.setMessage(ret);
@@ -425,17 +482,29 @@
                long now = System.currentTimeMillis();
                long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
                long messageLifeTime = 0;
-               synchronized (_filters) {
+               int unclaimedSize = 0;
+               try {
+                       messageFiltersReadLock.lock();
                        if(logMINOR) Logger.minor(this, "Checking _unclaimed");
                        for (ListIterator<Message> i = 
_unclaimed.listIterator(); i.hasNext();) {
                                Message m = i.next();
                                if (filter.match(m,now)) {
-                                       i.remove();
+                                       try {
+                                               messageFiltersWriteLock.lock();
+                                               i.remove();
+                                       } finally {
+                                               
messageFiltersWriteLock.unlock();
+                                       }
                                        ret = m;
                                        if(logMINOR) Logger.minor(this, 
"Matching from _unclaimed");
                                        break;
                                } else if (m.localInstantiationTime < 
messageDropTime) {
-                                       i.remove();
+                                       try {
+                                               messageFiltersWriteLock.lock();
+                                               i.remove();
+                                       } finally {
+                                               
messageFiltersWriteLock.unlock();
+                                       }
                                        messageLifeTime = now - 
m.localInstantiationTime;
                                        if ((m.getSource()) instanceof 
PeerNode) {
                                                Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
@@ -451,27 +520,40 @@
                                        ListIterator<MessageFilter> i = 
list.listIterator();
                                        while (true) {
                                                if (!i.hasNext()) {
-                                                       i.add(filter);
+                                                       try {
+                                                               
messageFiltersWriteLock.lock();
+                                                               i.add(filter);
+                                                       } finally {
+                                                               
messageFiltersWriteLock.unlock();
+                                                       }
                                                        if(logMINOR) 
Logger.minor(this, "Added at end");
                                                        break;
                                                }
                                                MessageFilter mf = i.next();
                                                if (mf.getTimeout() > 
filter.getTimeout()) {
-                                                       i.previous();
-                                                       i.add(filter);
+                                                       try {
+                                                               
messageFiltersWriteLock.lock();
+                                                               i.previous();
+                                                               i.add(filter);
+                                                       } finally {
+                                                               
messageFiltersWriteLock.unlock();
+                                                       }
                                                        if(logMINOR) 
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
                                                        break;
                                                }
                                        }
                                }
                        }
+               } finally {
+                       unclaimedSize = _unclaimed.size();
+                       messageFiltersReadLock.unlock();
                }
                long tEnd = System.currentTimeMillis();
                if(tEnd - now > 50) {
                        if(tEnd - now > 3000)
-                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" 
for ret of "+ret);
+                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+unclaimedSize+" for 
ret of "+ret);
                        else
-                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+_unclaimed.size()+" for ret of "+ret);
+                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+unclaimedSize+" for ret of "+ret);
                }
                // Unlock to wait on filter
                // Waiting on the filter won't release the outer lock
@@ -500,11 +582,14 @@
                if(!filter.matched()) {
                        // We must remove it from _filters before we return, or 
when it is re-added,
                        // it will be in the list twice, and potentially many 
more times than twice!
-                       synchronized(_filters) {
+                       try {
                                // Fortunately, it will be close to the 
beginning of the filters list, having
                                // just timed out. That is assuming it hasn't 
already been removed; in that
                                // case, this will be slower.
+                               messageFiltersWriteLock.lock();
                                _filters.remove(filter);
+                       } finally {
+                               messageFiltersWriteLock.unlock();
                        }
                }
                // Matched a packet, unclaimed or after wait
@@ -545,14 +630,18 @@
         * @return the number of received messages that are currently unclaimed
         */
        public int getUnclaimedFIFOSize() {
-               synchronized (_filters){
+               try {
+                       messageFiltersReadLock.lock();
                        return _unclaimed.size();
+               } finally {
+                       messageFiltersReadLock.unlock();
                }
        }
        
        public Map<String, Integer> getUnclaimedFIFOMessageCounts() {
                Map<String, Integer> messageCounts = new HashMap<String, 
Integer>();
-               synchronized(_filters) {
+               try {
+                       messageFiltersReadLock.lock();
                        for (ListIterator<Message> i = 
_unclaimed.listIterator(); i.hasNext();) {
                                Message m = i.next();
                                String messageName = m.getSpec().getName();
@@ -564,6 +653,8 @@
                                        messageCounts.put(messageName, 
messageCount );
                                }
                        }
+               } finally {
+                       messageFiltersReadLock.unlock();
                }
                return messageCounts;
        }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to