Author: nextgens
Date: 2009-04-17 16:42:03 +0000 (Fri, 17 Apr 2009)
New Revision: 26942
Modified:
branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
Log:
More optimization: sort the MessageFilters by Peer to speedup matching
Modified: branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-17 16:32:59 UTC (rev 26941)
+++ branches/nextgens-stuffs/freenet/src/freenet/io/comm/MessageCore.java
2009-04-17 16:42:03 UTC (rev 26942)
@@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
+import java.util.TreeMap;
import java.util.Vector;
import freenet.node.PeerNode;
@@ -49,7 +50,7 @@
private Dispatcher _dispatcher;
/** _filters serves as lock for both */
- private final LinkedList<MessageFilter> _filters = new
LinkedList<MessageFilter>();
+ private final Map<PeerContext, LinkedList<MessageFilter>> _filters =
new TreeMap<PeerContext,LinkedList<MessageFilter>>();
private final LinkedList<Message> _unclaimed = new
LinkedList<Message>();
private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME =
10*60*1000; // 10 minutes; maybe this should be per message type??
@@ -115,19 +116,22 @@
if(logMINOR)
Logger.minor(this, "Removing timed out filters");
synchronized (_filters) {
- for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.timedOut(tStart)) {
- if(logMINOR)
- Logger.minor(this, "Removing
"+f);
- i.remove();
- _timedOutFilters.add(f);
+ for(LinkedList<MessageFilter> mfl : _filters.values()) {
+ for (ListIterator<MessageFilter> i =
mfl.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.timedOut(tStart)) {
+ if(logMINOR)
+ Logger.minor(this,
"Removing "+f);
+ i.remove();
+ _timedOutFilters.add(f);
+ }
+ // Do not break after finding a
non-timed-out filter because some filters may
+ // be timed out because their client
callbacks say they should be.
+ // Also simplifies the logic
significantly, we've had some major bugs here.
+
+ // See also the end of waitFor() for
another weird case.
+
}
- // Do not break after finding a non-timed-out
filter because some filters may
- // be timed out because their client callbacks
say they should be.
- // Also simplifies the logic significantly,
we've had some major bugs here.
-
- // See also the end of waitFor() for another
weird case.
}
}
@@ -167,20 +171,24 @@
}
MessageFilter match = null;
synchronized (_filters) {
- for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.matched()) {
- Logger.error(this, "removed pre-matched
message filter found in _filters: "+f);
- i.remove();
- continue;
+ LinkedList<MessageFilter> list =
_filters.get(m.getSource());
+ if(list != null) {
+ for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.matched()) {
+ Logger.error(this, "removed
pre-matched message filter found in _filters: "+f);
+ i.remove();
+ continue;
+ }
+ if (f.match(m, tStart)) {
+ matched = true;
+ i.remove();
+ match = f;
+ if(logMINOR) Logger.minor(this,
"Matched: "+f);
+ break; // Only one match
permitted per message
+ }
}
- if (f.match(m, tStart)) {
- matched = true;
- i.remove();
- match = f;
- if(logMINOR) Logger.minor(this,
"Matched: "+f);
- break; // Only one match permitted per
message
- }
+
}
}
if(match != null) {
@@ -222,14 +230,17 @@
*/
synchronized (_filters) {
if(logMINOR) Logger.minor(this, "Rechecking
filters and adding message");
- for (ListIterator<MessageFilter> i =
_filters.listIterator(); i.hasNext();) {
- MessageFilter f = i.next();
- if (f.match(m)) {
- matched = true;
- match = f;
- i.remove();
- if(logMINOR) Logger.minor(this,
"Matched: "+f);
- break; // Only one match
permitted per message
+ LinkedList<MessageFilter> list =
_filters.get(m.getSource());
+ if(list != null) {
+ for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if (f.match(m, tStart)) {
+ matched = true;
+ match = f;
+ i.remove();
+ if(logMINOR)
Logger.minor(this, "Matched: "+f);
+ break; // Only one
match permitted per message
+ }
}
}
if(!matched) {
@@ -264,18 +275,23 @@
/** IncomingPacketFilter should call this when a node is disconnected.
*/
public void onDisconnect(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
- synchronized(_filters) {
- ListIterator<MessageFilter> i = _filters.listIterator();
- while (i.hasNext()) {
- MessageFilter f = i.next();
- if(f.matchesDroppedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new
ArrayList<MessageFilter>();
- droppedFilters.add(f);
- i.remove();
- }
+ synchronized(_filters) {
+ LinkedList<MessageFilter> list = _filters.get(ctx);
+ if(list == null)
+ return;
+ for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if(f.matchesDroppedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new
ArrayList<MessageFilter>();
+ droppedFilters.add(f);
+ i.remove();
+ }
}
- }
+ if(!list.isEmpty())
+ Logger.error(this, "RACE: how come there is
still something there? "+list.size()+" filter remaining for "+ctx);
+ _filters.remove(ctx);
+ }
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
mf.onDroppedConnection(ctx);
@@ -287,16 +303,21 @@
public void onRestart(PeerContext ctx) {
ArrayList<MessageFilter> droppedFilters = null; // rare
operation, we can waste objects for better locking
synchronized(_filters) {
- ListIterator<MessageFilter> i = _filters.listIterator();
- while (i.hasNext()) {
- MessageFilter f = i.next();
- if(f.matchesRestartedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new
ArrayList<MessageFilter>();
- droppedFilters.add(f);
- i.remove();
- }
+ LinkedList<MessageFilter> list = _filters.get(ctx);
+ if(list == null)
+ return;
+ for (ListIterator<MessageFilter> i =
list.listIterator(); i.hasNext();) {
+ MessageFilter f = i.next();
+ if(f.matchesRestartedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new
ArrayList<MessageFilter>();
+ droppedFilters.add(f);
+ i.remove();
+ }
}
+ if(!list.isEmpty())
+ Logger.error(this, "RACE: how come there is
still something there (restart)? "+list.size()+" filter remaining for "+ctx);
+ _filters.remove(ctx);
}
if(droppedFilters != null) {
for(MessageFilter mf : droppedFilters) {
@@ -352,20 +373,22 @@
if (ret == null) {
if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
// Insert filter into filter list in order of
timeout
- ListIterator<MessageFilter> i =
_filters.listIterator();
- while (true) {
- if (!i.hasNext()) {
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added at end");
- break;
+ for(LinkedList<MessageFilter> list :
_filters.values()) {
+ ListIterator<MessageFilter> i =
list.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR)
Logger.minor(this, "Added at end");
+ break;
+ }
+ MessageFilter mf = i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR)
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
}
- MessageFilter mf = i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
}
}
}
@@ -424,20 +447,22 @@
if (ret == null) {
if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
// Insert filter into filter list in order of
timeout
- ListIterator<MessageFilter> i =
_filters.listIterator();
- while (true) {
- if (!i.hasNext()) {
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added at end");
- break;
+ for(LinkedList<MessageFilter> list :
_filters.values()) {
+ ListIterator<MessageFilter> i =
list.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR)
Logger.minor(this, "Added at end");
+ break;
+ }
+ MessageFilter mf = i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR)
Logger.minor(this, "Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
}
- MessageFilter mf = i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
}
}
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs