Author: nextgens
Date: 2009-04-19 23:23:26 +0000 (Sun, 19 Apr 2009)
New Revision: 27088
Modified:
branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
The previous attempts to speedup message matching were silly: neither the
source nor the spec are mandatory in filters (*sigh*). There is no point in
trying to reduce the size of the _filters list assuming they are.
Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-19 22:32:31 UTC (rev 27087)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-19 23:23:26 UTC (rev 27088)
@@ -23,11 +23,7 @@
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
-import java.util.TreeMap;
import java.util.Vector;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import freenet.node.PeerNode;
import freenet.node.Ticker;
@@ -52,10 +48,8 @@
}
private Dispatcher _dispatcher;
- private final Lock messageFiltersReadLock;
- private final Lock messageFiltersWriteLock;
- /** messageFiltersLock serves as lock for both _filters and _unclaimed
*/
- private final Map<PeerContext, LinkedList<MessageFilter>> _filters =
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
+ /** _filters serves as lock for both */
+ private final LinkedList<MessageFilter> _filters = new
LinkedList<MessageFilter>();
private final LinkedList<Message> _unclaimed = new
LinkedList<Message>();
private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME =
10*60*1000; // 10 minutes; maybe this should be per message type??
@@ -69,9 +63,6 @@
public MessageCore() {
_timedOutFilters = new Vector<MessageFilter>(32);
- ReadWriteLock messageFiltersLock = new
ReentrantReadWriteLock(true);
- messageFiltersReadLock = messageFiltersLock.readLock();
- messageFiltersWriteLock = messageFiltersLock.writeLock();
}
/**
@@ -123,33 +114,21 @@
// Avoids exhaustive and unsuccessful search in waitFor()
removal of a timed out filter.
if(logMINOR)
Logger.minor(this, "Removing timed out filters");
-
- try {
- messageFiltersWriteLock.lock();
- for(LinkedList<MessageFilter> mfl : _filters.values()) {
- for (ListIterator<MessageFilter> i =
mfl.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.timedOut(tStart)) {
- if(logMINOR)
- Logger.minor(this,
"Removing "+f);
- try {
-
messageFiltersWriteLock.lock();
- i.remove();
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- _timedOutFilters.add(f);
- }
- // Do not break after finding a
non-timed-out filter because some filters may
- // be timed out because their client
callbacks say they should be.
- // Also simplifies the logic
significantly, we've had some major bugs here.
-
- // See also the end of waitFor() for
another weird case.
-
+ synchronized (_filters) {
+ for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.timedOut(tStart)) {
+ if(logMINOR)
+ Logger.minor(this, "Removing
"+f);
+ i.remove();
+ _timedOutFilters.add(f);
}
+ // Do not break after finding a non-timed-out
filter because some filters may
+ // be timed out because their client callbacks
say they should be.
+ // Also simplifies the logic significantly,
we've had some major bugs here.
+
+ // See also the end of waitFor() for another
weird case.
}
- } finally {
- messageFiltersWriteLock.unlock();
}
for(MessageFilter f : _timedOutFilters) {
@@ -180,42 +159,27 @@
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
}
boolean matched = false;
- if(logMINOR) {
if (!(m.getSpec().equals(DMT.packetTransmit))) {
- Logger.minor(this, "" + (System.currentTimeMillis() %
60000) + ' ' + from + " <- "
+ if(logMINOR) Logger.minor(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
+ m.getSource() + " : " + m);
}
- }
MessageFilter match = null;
- try {
- messageFiltersReadLock.lock();
- LinkedList<MessageFilter> list =
_filters.get(m.getSource());
- if(list != null) {
- for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.match(m, tStart)) {
- try {
-
messageFiltersReadLock.unlock();
-
messageFiltersWriteLock.lock();
- if (f.match(m, tStart))
{
- if
(f.matched()) {
-
Logger.error(this, "removed pre-matched message filter found in _filters: "+f);
- } else {
- matched
= true;
- match =
f;
-
if(logMINOR) Logger.minor(this, "Matched: "+f);
- }
- i.remove();
- }
- break; // Only one
match permitted per message
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- }
+ synchronized (_filters) {
+ for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.matched()) {
+ Logger.error(this, "removed pre-matched
message filter found in _filters: "+f);
+ i.remove();
+ continue;
}
+ if (f.match(m, tStart)) {
+ matched = true;
+ i.remove();
+ match = f;
+ if(logMINOR) Logger.minor(this,
"Matched: "+f);
+ break; // Only one match permitted per
message
+ }
}
- } finally {
- messageFiltersReadLock.unlock();
}
if(match != null) {
match.setMessage(m);
@@ -254,42 +218,21 @@
* Another race is possible if we merely recheck the
* filters after we return from dispatcher, for example.
*/
- try {
- messageFiltersReadLock.lock();
+ synchronized (_filters) {
if(logMINOR) Logger.minor(this, "Rechecking
filters and adding message");
- LinkedList<MessageFilter> list =
_filters.get(m.getSource());
- if(list != null) {
- for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.match(m, tStart)) {
- try {
-
messageFiltersReadLock.unlock();
-
messageFiltersWriteLock.lock();
- if (f.match(m,
tStart)) {
- matched
= true;
- match =
f;
-
i.remove();
-
if(logMINOR) Logger.minor(this, "Matched: "+f);
- break;
// Only one match permitted per message
- }
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- }
+ for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.match(m, tStart)) {
+ matched = true;
+ match = f;
+ i.remove();
+ if(logMINOR) Logger.minor(this,
"Matched: "+f);
+ break; // Only one match
permitted per message
}
}
if(!matched) {
while (_unclaimed.size() >
MAX_UNMATCHED_FIFO_SIZE) {
- Message removed = null;
- try {
-
messageFiltersReadLock.unlock();
-
messageFiltersWriteLock.lock();
- if(_unclaimed.size() >
MAX_UNMATCHED_FIFO_SIZE)
-
_unclaimed.removeFirst();
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- if(removed == null) continue;
+ Message removed =
_unclaimed.removeFirst();
long messageLifeTime =
System.currentTimeMillis() - removed.localInstantiationTime;
if ((removed.getSource()) instanceof
PeerNode) {
Logger.normal(this, "Dropping
unclaimed from "+removed.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
@@ -297,17 +240,9 @@
Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+"
(quantity)"+": "+removed);
}
}
- try {
- messageFiltersReadLock.unlock();
- messageFiltersWriteLock.lock();
- _unclaimed.addLast(m);
- } finally {
-
messageFiltersWriteLock.unlock();
- }
+ _unclaimed.addLast(m);
if(logMINOR) Logger.minor(this, "Done");
}
- } finally {
- messageFiltersReadLock.unlock();
}
if(match != null) {
match.setMessage(m);
@@ -315,38 +250,29 @@
}
}
long tEnd = System.currentTimeMillis();
- long dT = tEnd - tStart;
- if(dT > 50) {
- if(dT > 3000)
- Logger.error(this, "checkFilters took
"+(dT)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for matched:
"+matched);
+ if(tEnd - tStart > 50) {
+ if(tEnd - tStart > 3000)
+ Logger.error(this, "checkFilters took
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
else
- if(logMINOR) Logger.minor(this, "checkFilters
took "+(dT)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for matched:
"+matched);
+ if(logMINOR) Logger.minor(this, "checkFilters
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
}
}
/** IncomingPacketFilter should call this when a node is disconnected.
*/
public void onDisconnect(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
- try {
- messageFiltersWriteLock.lock();
- LinkedList<MessageFilter> list = _filters.get(ctx);
- if(list == null)
- return;
- for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if(f.matchesDroppedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new
ArrayList<MessageFilter>();
- droppedFilters.add(f);
- i.remove();
- }
+ synchronized(_filters) {
+ ListIterator<MessageFilter> i = _filters.listIterator();
+ while (i.hasNext()) {
+ MessageFilter f = i.next();
+ if(f.matchesDroppedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new
ArrayList<MessageFilter>();
+ droppedFilters.add(f);
+ i.remove();
+ }
}
- if(!list.isEmpty())
- Logger.error(this, "RACE: how come there is
still something there? "+list.size()+" filter remaining for "+ctx);
- _filters.remove(ctx);
- } finally {
- messageFiltersWriteLock.unlock();
- }
+ }
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
mf.onDroppedConnection(ctx);
@@ -357,25 +283,17 @@
/** IncomingPacketFilter should call this when a node connects with a
new boot ID */
public void onRestart(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
- try {
- messageFiltersWriteLock.lock();
- LinkedList<MessageFilter> list = _filters.get(ctx);
- if(list == null)
- return;
- for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if(f.matchesRestartedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new
ArrayList<MessageFilter>();
- droppedFilters.add(f);
- i.remove();
- }
+ synchronized(_filters) {
+ ListIterator<MessageFilter> i = _filters.listIterator();
+ while (i.hasNext()) {
+ MessageFilter f = i.next();
+ if(f.matchesRestartedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new
ArrayList<MessageFilter>();
+ droppedFilters.add(f);
+ i.remove();
+ }
}
- if(!list.isEmpty())
- Logger.error(this, "RACE: how come there is
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
- _filters.remove(ctx);
- } finally {
- messageFiltersWriteLock.unlock();
}
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
@@ -402,8 +320,7 @@
long now = System.currentTimeMillis();
long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
long messageLifeTime = 0;
- try {
- messageFiltersWriteLock.lock();
+ synchronized (_filters) {
//Once in the list, it is up to the callback system to
trigger the disconnection, however, we may
//have disconnected between check above and locking, so
we *must* check again.
if(filter.anyConnectionsDropped()) {
@@ -432,26 +349,22 @@
if (ret == null) {
if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
// Insert filter into filter list in order of
timeout
- for(LinkedList<MessageFilter> list :
_filters.values()) {
- ListIterator<MessageFilter> i =
list.listIterator();
- while (true) {
- if (!i.hasNext()) {
- i.add(filter);
- if(logMINOR)
Logger.minor(this, "Added at end");
- break;
- }
- MessageFilter mf = i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
- if(logMINOR)
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
+ ListIterator<MessageFilter> i =
_filters.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added at end");
+ break;
}
+ MessageFilter mf = i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
}
}
- } finally {
- messageFiltersWriteLock.unlock();
}
if(ret != null) {
filter.setMessage(ret);
@@ -486,33 +399,17 @@
long now = System.currentTimeMillis();
long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
long messageLifeTime = 0;
- int unclaimedSize = 0;
- try {
- messageFiltersReadLock.lock();
+ synchronized (_filters) {
if(logMINOR) Logger.minor(this, "Checking _unclaimed");
for (ListIterator<Message> i =
_unclaimed.listIterator(); i.hasNext();) {
Message m = i.next();
- if (filter.match(m,now)) {
- try {
- messageFiltersReadLock.unlock();
- messageFiltersWriteLock.lock();
- if(filter.match(m,now)) {
- i.remove();
- }
- } finally {
-
messageFiltersWriteLock.unlock();
- }
+ if (filter.match(m, now)) {
+ i.remove();
ret = m;
if(logMINOR) Logger.minor(this,
"Matching from _unclaimed");
break;
} else if (m.localInstantiationTime <
messageDropTime) {
- try {
- messageFiltersReadLock.unlock();
- messageFiltersWriteLock.lock();
- i.remove();
- } finally {
-
messageFiltersWriteLock.unlock();
- }
+ i.remove();
messageLifeTime = now -
m.localInstantiationTime;
if ((m.getSource()) instanceof
PeerNode) {
Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
@@ -524,48 +421,29 @@
if (ret == null) {
if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
// Insert filter into filter list in order of
timeout
- for(LinkedList<MessageFilter> list :
_filters.values()) {
- ListIterator<MessageFilter> i =
list.listIterator();
- while (true) {
- if (!i.hasNext()) {
- try {
-
messageFiltersReadLock.unlock();
-
messageFiltersWriteLock.lock();
- i.add(filter);
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- if(logMINOR)
Logger.minor(this, "Added at end");
- break;
- }
- MessageFilter mf = i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- try {
-
messageFiltersReadLock.unlock();
-
messageFiltersWriteLock.lock();
- if
(mf.getTimeout() > filter.getTimeout()) {
-
i.previous();
-
i.add(filter);
- }
- } finally {
-
messageFiltersWriteLock.unlock();
- }
- if(logMINOR)
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
+ ListIterator<MessageFilter> i =
_filters.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added at end");
+ break;
}
+ MessageFilter mf = i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
}
}
- } finally {
- unclaimedSize = _unclaimed.size();
- messageFiltersReadLock.unlock();
}
long tEnd = System.currentTimeMillis();
if(tEnd - now > 50) {
if(tEnd - now > 3000)
- Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+unclaimedSize+" for
ret of "+ret);
+ Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+"
for ret of "+ret);
else
- if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+unclaimedSize+" for ret of "+ret);
+ if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+_unclaimed.size()+" for ret of "+ret);
}
// Unlock to wait on filter
// Waiting on the filter won't release the outer lock
@@ -594,17 +472,11 @@
if(!filter.matched()) {
// We must remove it from _filters before we return, or
when it is re-added,
// it will be in the list twice, and potentially many
more times than twice!
- try {
+ synchronized(_filters) {
// Fortunately, it will be close to the
beginning of the filters list, having
// just timed out. That is assuming it hasn't
already been removed; in that
// case, this will be slower.
- messageFiltersReadLock.unlock();
- messageFiltersWriteLock.lock();
- if(!filter.matched()) {
- _filters.remove(filter);
- }
- } finally {
- messageFiltersWriteLock.unlock();
+ _filters.remove(filter);
}
}
// Matched a packet, unclaimed or after wait
@@ -645,18 +517,14 @@
* @return the number of received messages that are currently unclaimed
*/
public int getUnclaimedFIFOSize() {
- try {
- messageFiltersReadLock.lock();
+ synchronized (_filters){
return _unclaimed.size();
- } finally {
- messageFiltersReadLock.unlock();
}
}
public Map<String, Integer> getUnclaimedFIFOMessageCounts() {
Map<String, Integer> messageCounts = new HashMap<String,
Integer>();
- try {
- messageFiltersReadLock.lock();
+ synchronized(_filters) {
for (ListIterator<Message> i =
_unclaimed.listIterator(); i.hasNext();) {
Message m = i.next();
String messageName = m.getSpec().getName();
@@ -668,8 +536,6 @@
messageCounts.put(messageName,
messageCount );
}
}
- } finally {
- messageFiltersReadLock.unlock();
}
return messageCounts;
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs