Author: nextgens
Date: 2009-04-17 17:12:49 +0000 (Fri, 17 Apr 2009)
New Revision: 26946
Modified:
branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
Reduce locking on MessageCore.
Needs to be tested ^-^
Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-17 16:58:16 UTC (rev 26945)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-17 17:12:49 UTC (rev 26946)
@@ -25,6 +25,9 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import freenet.node.PeerNode;
import freenet.node.Ticker;
@@ -49,7 +52,10 @@
}
private Dispatcher _dispatcher;
- /** _filters serves as lock for both */
+ /** lock serves as lock for both */
+ private final ReadWriteLock messageFiltersLock = new
ReentrantReadWriteLock();
+ private final Lock messageFiltersReadLock =
messageFiltersLock.readLock();
+ private final Lock messageFiltersWriteLock =
messageFiltersLock.writeLock();
private final Map<PeerContext, LinkedList<MessageFilter>> _filters =
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
private final LinkedList<Message> _unclaimed = new
LinkedList<Message>();
private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
@@ -115,14 +121,21 @@
// Avoids exhaustive and unsuccessful search in waitFor()
removal of a timed out filter.
if(logMINOR)
Logger.minor(this, "Removing timed out filters");
- synchronized (_filters) {
+
+ try {
+ messageFiltersWriteLock.lock();
for(LinkedList<MessageFilter> mfl : _filters.values()) {
for (ListIterator<MessageFilter> i =
mfl.listIterator(); i.hasNext();) {
MessageFilter f = i.next();
if (f.timedOut(tStart)) {
if(logMINOR)
Logger.minor(this,
"Removing "+f);
- i.remove();
+ try {
+
messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
_timedOutFilters.add(f);
}
// Do not break after finding a
non-timed-out filter because some filters may
@@ -133,6 +146,8 @@
}
}
+ } finally {
+ messageFiltersWriteLock.unlock();
}
for(MessageFilter f : _timedOutFilters) {
@@ -170,19 +185,30 @@
}
}
MessageFilter match = null;
- synchronized (_filters) {
+ try {
+ messageFiltersReadLock.lock();
LinkedList<MessageFilter> list =
_filters.get(m.getSource());
if(list != null) {
for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
MessageFilter f = i.next();
if (f.matched()) {
Logger.error(this, "removed
pre-matched message filter found in _filters: "+f);
- i.remove();
+ try {
+
messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
continue;
}
if (f.match(m, tStart)) {
matched = true;
- i.remove();
+ try {
+
messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
match = f;
if(logMINOR) Logger.minor(this,
"Matched: "+f);
break; // Only one match
permitted per message
@@ -190,6 +216,8 @@
}
}
+ } finally {
+ messageFiltersReadLock.unlock();
}
if(match != null) {
match.setMessage(m);
@@ -228,7 +256,8 @@
* Another race is possible if we merely recheck the
* filters after we return from dispatcher, for example.
*/
- synchronized (_filters) {
+ try {
+ messageFiltersReadLock.lock();
if(logMINOR) Logger.minor(this, "Rechecking
filters and adding message");
LinkedList<MessageFilter> list =
_filters.get(m.getSource());
if(list != null) {
@@ -237,7 +266,12 @@
if (f.match(m, tStart)) {
matched = true;
match = f;
- i.remove();
+ try {
+
messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
if(logMINOR)
Logger.minor(this, "Matched: "+f);
break; // Only one
match permitted per message
}
@@ -245,7 +279,14 @@
}
if(!matched) {
while (_unclaimed.size() >
MAX_UNMATCHED_FIFO_SIZE) {
- Message removed =
_unclaimed.removeFirst();
+ Message removed = null;
+ try {
+
messageFiltersWriteLock.lock();
+
_unclaimed.removeFirst();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
+ if(removed == null) continue;
long messageLifeTime =
System.currentTimeMillis() - removed.localInstantiationTime;
if ((removed.getSource()) instanceof
PeerNode) {
Logger.normal(this, "Dropping
unclaimed from "+removed.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
@@ -253,9 +294,16 @@
Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+"
(quantity)"+": "+removed);
}
}
- _unclaimed.addLast(m);
+ try {
+ messageFiltersWriteLock.lock();
+ _unclaimed.addLast(m);
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
if(logMINOR) Logger.minor(this, "Done");
}
+ } finally {
+ messageFiltersReadLock.unlock();
}
if(match != null) {
match.setMessage(m);
@@ -275,7 +323,8 @@
/** IncomingPacketFilter should call this when a node is disconnected.
*/
public void onDisconnect(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
- synchronized(_filters) {
+ try {
+ messageFiltersWriteLock.lock();
LinkedList<MessageFilter> list = _filters.get(ctx);
if(list == null)
return;
@@ -291,6 +340,8 @@
if(!list.isEmpty())
Logger.error(this, "RACE: how come there is
still something there? "+list.size()+" filter remaining for "+ctx);
_filters.remove(ctx);
+ } finally {
+ messageFiltersWriteLock.unlock();
}
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
@@ -302,7 +353,8 @@
/** IncomingPacketFilter should call this when a node connects with a
new boot ID */
public void onRestart(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
- synchronized(_filters) {
+ try {
+ messageFiltersWriteLock.lock();
LinkedList<MessageFilter> list = _filters.get(ctx);
if(list == null)
return;
@@ -318,6 +370,8 @@
if(!list.isEmpty())
Logger.error(this, "RACE: how come there is
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
_filters.remove(ctx);
+ } finally {
+ messageFiltersWriteLock.unlock();
}
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
@@ -344,7 +398,8 @@
long now = System.currentTimeMillis();
long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
long messageLifeTime = 0;
- synchronized (_filters) {
+ try {
+ messageFiltersWriteLock.lock();
//Once in the list, it is up to the callback system to
trigger the disconnection, however, we may
//have disconnected between check above and locking, so
we *must* check again.
if(filter.anyConnectionsDropped()) {
@@ -391,6 +446,8 @@
}
}
}
+ } finally {
+ messageFiltersWriteLock.unlock();
}
if(ret != null) {
filter.setMessage(ret);
@@ -425,17 +482,29 @@
long now = System.currentTimeMillis();
long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
long messageLifeTime = 0;
- synchronized (_filters) {
+ int unclaimedSize = 0;
+ try {
+ messageFiltersReadLock.lock();
if(logMINOR) Logger.minor(this, "Checking _unclaimed");
for (ListIterator<Message> i =
_unclaimed.listIterator(); i.hasNext();) {
Message m = i.next();
if (filter.match(m,now)) {
- i.remove();
+ try {
+ messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
ret = m;
if(logMINOR) Logger.minor(this,
"Matching from _unclaimed");
break;
} else if (m.localInstantiationTime <
messageDropTime) {
- i.remove();
+ try {
+ messageFiltersWriteLock.lock();
+ i.remove();
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
messageLifeTime = now -
m.localInstantiationTime;
if ((m.getSource()) instanceof
PeerNode) {
Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
@@ -451,27 +520,40 @@
ListIterator<MessageFilter> i =
list.listIterator();
while (true) {
if (!i.hasNext()) {
- i.add(filter);
+ try {
+
messageFiltersWriteLock.lock();
+ i.add(filter);
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
if(logMINOR)
Logger.minor(this, "Added at end");
break;
}
MessageFilter mf = i.next();
if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
+ try {
+
messageFiltersWriteLock.lock();
+ i.previous();
+ i.add(filter);
+ } finally {
+
messageFiltersWriteLock.unlock();
+ }
if(logMINOR)
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
break;
}
}
}
}
+ } finally {
+ unclaimedSize = _unclaimed.size();
+ messageFiltersReadLock.unlock();
}
long tEnd = System.currentTimeMillis();
if(tEnd - now > 50) {
if(tEnd - now > 3000)
- Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+"
for ret of "+ret);
+ Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+unclaimedSize+" for
ret of "+ret);
else
- if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+_unclaimed.size()+" for ret of "+ret);
+ if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+unclaimedSize+" for ret of "+ret);
}
// Unlock to wait on filter
// Waiting on the filter won't release the outer lock
@@ -500,11 +582,14 @@
if(!filter.matched()) {
// We must remove it from _filters before we return, or
when it is re-added,
// it will be in the list twice, and potentially many
more times than twice!
- synchronized(_filters) {
+ try {
// Fortunately, it will be close to the
beginning of the filters list, having
// just timed out. That is assuming it hasn't
already been removed; in that
// case, this will be slower.
+ messageFiltersWriteLock.lock();
_filters.remove(filter);
+ } finally {
+ messageFiltersWriteLock.unlock();
}
}
// Matched a packet, unclaimed or after wait
@@ -545,14 +630,18 @@
* @return the number of received messages that are currently unclaimed
*/
public int getUnclaimedFIFOSize() {
- synchronized (_filters){
+ try {
+ messageFiltersReadLock.lock();
return _unclaimed.size();
+ } finally {
+ messageFiltersReadLock.unlock();
}
}
public Map<String, Integer> getUnclaimedFIFOMessageCounts() {
Map<String, Integer> messageCounts = new HashMap<String,
Integer>();
- synchronized(_filters) {
+ try {
+ messageFiltersReadLock.lock();
for (ListIterator<Message> i =
_unclaimed.listIterator(); i.hasNext();) {
Message m = i.next();
String messageName = m.getSpec().getName();
@@ -564,6 +653,8 @@
messageCounts.put(messageName,
messageCount );
}
}
+ } finally {
+ messageFiltersReadLock.unlock();
}
return messageCounts;
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs