Author: toad
Date: 2006-09-02 20:20:57 +0000 (Sat, 02 Sep 2006)
New Revision: 10372
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/node/Version.java
Log:
960: Implement probe-requests. Not tested yet! Build is otherwise harmless and
will be mandatory on the 9th of September.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2006-09-02 19:22:56 UTC (rev
10371)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2006-09-02 20:20:57 UTC (rev
10372)
@@ -71,6 +71,7 @@
public static final String PING_SEQNO = "pingSequenceNumber";
public static final String LOCATION = "location";
public static final String NEAREST_LOCATION = "nearestLocation";
+ public static final String BEST_LOCATION = "bestLocation";
public static final String TARGET_LOCATION = "targetLocation";
public static final String TYPE = "type";
public static final String PAYLOAD = "payload";
@@ -669,6 +670,72 @@
return msg;
}
+ public static final MessageType FNPProbeRequest = new
MessageType("FNPProbeRequest") {{
+ addField(UID, Long.class);
+ addField(TARGET_LOCATION, Double.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(BEST_LOCATION, Double.class);
+ addField(HTL, Short.class);
+ addField(COUNTER, Short.class);
+ }};
+
+ public static final Message createFNPProbeRequest(long uid, double target,
double nearest,
+ double best, short htl, short counter) {
+ Message msg = new Message(FNPProbeRequest);
+ msg.set(UID, uid);
+ msg.set(TARGET_LOCATION, target);
+ msg.set(NEAREST_LOCATION, nearest);
+ msg.set(BEST_LOCATION, best);
+ msg.set(HTL, htl);
+ msg.set(COUNTER, counter);
+ return msg;
+ }
+
+ public static final MessageType FNPProbeReply = new
MessageType("FNPProbeReply") {{
+ addField(UID, Long.class);
+ addField(TARGET_LOCATION, Double.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(BEST_LOCATION, Double.class);
+ addField(COUNTER, Short.class);
+ }};
+
+ public static final Message createFNPProbeReply(long uid, double target,
double nearest,
+ double best, short counter) {
+ Message msg = new Message(FNPProbeReply);
+ msg.set(UID, uid);
+ msg.set(TARGET_LOCATION, target);
+ msg.set(NEAREST_LOCATION, nearest);
+ msg.set(BEST_LOCATION, best);
+ msg.set(COUNTER, counter);
+ return msg;
+ }
+
+ public static final MessageType FNPProbeRejected = new
MessageType("FNPProbeRejected") {{
+ addField(UID, Long.class);
+ addField(TARGET_LOCATION, Double.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(BEST_LOCATION, Double.class);
+ addField(HTL, Short.class);
+ addField(COUNTER, Short.class);
+ addField(REASON, Short.class);
+ }};
+
+ public static final Message createFNPProbeRejected(long uid, double
target, double nearest,
+ double best, short counter, short reason) {
+ Message msg = new Message(FNPProbeRejected);
+ msg.set(UID, uid);
+ msg.set(TARGET_LOCATION, target);
+ msg.set(NEAREST_LOCATION, nearest);
+ msg.set(BEST_LOCATION, best);
+ msg.set(COUNTER, counter);
+ msg.set(REASON, reason);
+ return msg;
+ }
+
+ static public final short PROBE_REJECTED_LOOP = 1;
+ static public final short PROBE_REJECTED_RNF = 2;
+ static public final short PROBE_REJECTED_OVERLOAD = 3;
+
public static final MessageType FNPSwapRequest = new
MessageType("FNPSwapRequest") {{
addField(UID, Long.class);
addField(HASH, ShortBuffer.class);
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-09-02 19:22:56 UTC
(rev 10371)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-09-02 20:20:57 UTC
(rev 10372)
@@ -239,7 +239,7 @@
PeerNode next;
// Can backtrack, so only route to nodes closer than we are to
target.
double nextValue;
- next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
if(next != null)
nextValue = next.getLocation().getValue();
else
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-09-02 19:22:56 UTC
(rev 10371)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-09-02 20:20:57 UTC
(rev 10372)
@@ -9,6 +9,8 @@
import freenet.io.comm.MessageType;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.Peer;
+import freenet.support.LRUHashtable;
+import freenet.support.LRUQueue;
import freenet.support.Logger;
/**
@@ -93,12 +95,6 @@
return node.lm.handleSwapCommit(m);
} else if(spec == DMT.FNPSwapComplete) {
return node.lm.handleSwapComplete(m);
- } else if(spec == DMT.FNPRoutedPing) {
- return handleRouted(m);
- } else if(spec == DMT.FNPRoutedPong) {
- return handleRoutedReply(m);
- } else if(spec == DMT.FNPRoutedRejected) {
- return handleRoutedRejected(m);
} else if(spec == DMT.FNPCHKDataRequest) {
return handleDataRequest(m, false);
} else if(spec == DMT.FNPSSKDataRequest) {
@@ -107,11 +103,23 @@
return handleInsertRequest(m, false);
} else if(spec == DMT.FNPSSKInsertRequest) {
return handleInsertRequest(m, true);
- }
+ } else if(spec == DMT.FNPRoutedPing) {
+ return handleRouted(m);
+ } else if(spec == DMT.FNPRoutedPong) {
+ return handleRoutedReply(m);
+ } else if(spec == DMT.FNPRoutedRejected) {
+ return handleRoutedRejected(m);
+ } else if(spec == DMT.FNPProbeRequest) {
+ return handleProbeRequest(m, source);
+ } else if(spec == DMT.FNPProbeReply) {
+ return handleProbeReply(m, source);
+ } else if(spec == DMT.FNPProbeRejected) {
+ return handleProbeRejected(m, source);
+ }
return false;
}
- /**
+ /**
* Handle an incoming FNPDataRequest.
*/
private boolean handleDataRequest(Message m, boolean isSSK) {
@@ -329,7 +337,7 @@
// Forward
m = preForward(m, htl);
while(true) {
- PeerNode next = node.peers.closerPeer(pn, ctx.routedTo,
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled());
+ PeerNode next = node.peers.closerPeer(pn, ctx.routedTo,
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
if(logMINOR) Logger.minor(this, "Next: "+next+" message: "+m);
if(next != null) {
// next is connected, or at least has been => next.getPeer()
CANNOT be null.
@@ -388,4 +396,244 @@
}
return false;
}
+
+ // Probe request handling
+
+ long tLastReceivedProbeRequest;
+
+ static final int MAX_PROBE_CONTEXTS = 1000;
+
+ class ProbeContext {
+
+ final PeerNode src; // FIXME make this a weak reference or something ?
- Memory leak with high connection churn
+ final HashSet visitedPeers;
+ short counter;
+ short htl;
+ double nearest;
+ double best;
+
+ public ProbeContext(long id, double target, double best, double
nearest, short htl, short counter, PeerNode src) {
+ visitedPeers = new HashSet();
+ this.counter = counter;
+ this.htl = htl;
+ this.nearest = nearest;
+ this.best = best;
+ this.src = src;
+ }
+
+ }
+
+ final LRUQueue recentProbeRequestIDs = new LRUQueue();
+ final LRUHashtable recentProbeContexts = new LRUHashtable();
+
+ /**
+ * Handle a probe request.
+ * Reject it if it's looped.
+ * Look up (and promote) its context object.
+ * Update its HTL, nearest-seen and best-seen.
+ * Complete it if it has run out of HTL.
+ * Otherwise forward it.
+ **/
+ private boolean handleProbeRequest(Message m, PeerNode src) {
+ long id = m.getLong(DMT.UID);
+ Long lid = new Long(id);
+ double target = m.getDouble(DMT.TARGET_LOCATION);
+ double best = m.getDouble(DMT.BEST_LOCATION);
+ double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+ short htl = m.getShort(DMT.HTL);
+ short counter = m.getShort(DMT.COUNTER);
+ if(logMINOR)
+ Logger.minor(this, "Probe request: "+id+" "+target+"
"+best+" "+nearest+" "+htl+" "+counter);
+ if(recentProbeRequestIDs.contains(lid)) {
+ // Reject: Loop
+ Message reject = DMT.createFNPProbeRejected(id, target,
nearest, best, counter, DMT.PROBE_REJECTED_LOOP);
+ try {
+ src.sendAsync(reject, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected rejecting a
probe request from "+src);
+ }
+ return true;
+ }
+ return innerHandleProbeRequest(src, id, lid, target, best,
nearest, htl, counter, true);
+ }
+
+ private boolean innerHandleProbeRequest(PeerNode src, long id, Long lid,
double target, double best,
+ double nearest, short htl, short counter, boolean checkRecent) {
+ if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+ if(htl <= 1) htl = 1;
+ ProbeContext ctx = null;
+ boolean rejected = false;
+ synchronized(recentProbeContexts) {
+ if(checkRecent) {
+ long now = System.currentTimeMillis();
+ if(now - tLastReceivedProbeRequest < 1000) {
+ rejected = true;
+ } else
+ tLastReceivedProbeRequest = now;
+ }
+ if(!rejected) {
+ ctx = (ProbeContext)
recentProbeContexts.get(lid);
+ if(ctx == null) {
+ ctx = new ProbeContext(id, target,
best, nearest, htl, counter, src);
+ }
+ recentProbeContexts.push(lid, ctx); // promote
or add
+ while(recentProbeContexts.size() >
MAX_PROBE_CONTEXTS)
+ recentProbeContexts.popValue();
+ }
+ }
+ if(rejected) {
+ // Reject: rate limit
+ Message reject = DMT.createFNPProbeRejected(id, target,
nearest, best, counter, DMT.PROBE_REJECTED_OVERLOAD);
+ try {
+ src.sendAsync(reject, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected rejecting a
probe request from "+src);
+ }
+ return true;
+ }
+ // FIXME Update any important values on ctx
+ if(ctx.counter < counter) ctx.counter = counter;
+ double oldDist = PeerManager.distance(ctx.nearest, target);
+ double newDist = PeerManager.distance(nearest, target);
+ // FIXME use this elsewhere? Does it make sense?
+ if(oldDist > newDist) {
+ ctx.htl = htl;
+ ctx.nearest = nearest;
+ } else if(Math.abs(oldDist - newDist) < Double.MIN_VALUE*2) {
+ if(htl > ctx.htl-1) htl =
(short)Math.max(0,(ctx.htl-1));
+ else ctx.htl = htl;
+ } else {
+ Logger.error(this, "Distance increased: "+oldDist+" ->
"+newDist+" htl: "+ctx.htl+" -> "+htl+" , using old HTL and dist");
+ htl = ctx.htl;
+ nearest = ctx.nearest;
+ }
+
+ PeerNode[] peers = node.peers.myPeers;
+
+ // Update best
+
+ for(int i=0;i<peers.length;i++) {
+ if(!peers[i].isConnected()) continue;
+ double loc = peers[i].getLocation().getValue();
+ // We are only interested in locations greater than the
target
+ if(loc <= (best + 2*Double.MIN_VALUE)) continue;
+ if(loc < best) best = loc;
+ }
+
+ // Update nearest
+
+ double myLoc = node.getLocation();
+ if(PeerManager.distance(myLoc, target) >
PeerManager.distance(nearest, target)) {
+ nearest = myLoc;
+ htl = Node.MAX_HTL;
+ } else {
+ htl--;
+ if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+ }
+
+ // Complete ?
+ if(htl == 0) {
+ // Complete
+ Message complete = DMT.createFNPProbeReply(id, target,
nearest, best, counter++);
+ try {
+ src.sendAsync(complete, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected completing a
probe request from "+src);
+ }
+ return true;
+ }
+
+ // Otherwise route it
+
+ HashSet visited = ctx.visitedPeers;
+
+ while(true) {
+
+ PeerNode pn = node.peers.closerPeer(src, visited, null,
target, true, false, 960);
+
+ if(pn == null) {
+ // Can't complete, because some HTL left
+ // Reject: RNF
+ Message reject = DMT.createFNPProbeRejected(id,
target, nearest, best, counter, DMT.PROBE_REJECTED_RNF);
+ try {
+ src.sendAsync(reject, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected
rejecting a probe request from "+src);
+ }
+ return true;
+ }
+
+ visited.add(pn);
+
+ Message forwarded =
+ DMT.createFNPProbeRequest(id, target, nearest,
best, htl, counter++);
+ try {
+ pn.sendAsync(forwarded, null, 0, null);
+ return true;
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Could not forward message:
disconnected: "+pn+" : "+e, e);
+ // Try another one
+ }
+ }
+
+ }
+
+ private boolean handleProbeReply(Message m, PeerNode src) {
+ long id = m.getLong(DMT.UID);
+ Long lid = new Long(id);
+ double target = m.getDouble(DMT.TARGET_LOCATION);
+ double best = m.getDouble(DMT.BEST_LOCATION);
+ double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+ short counter = m.getShort(DMT.COUNTER);
+ if(logMINOR)
+ Logger.minor(this, "Probe reply: "+id+" "+target+"
"+best+" "+nearest);
+ // Just propagate back to source
+ ProbeContext ctx;
+ synchronized(recentProbeContexts) {
+ ctx = (ProbeContext) recentProbeContexts.get(lid);
+ if(ctx == null) {
+ Logger.normal(this, "Could not forward probe
reply back to source for ID "+id);
+ return false;
+ }
+ recentProbeContexts.push(lid, ctx); // promote or add
+ while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
+ recentProbeContexts.popValue();
+ }
+
+ Message complete = DMT.createFNPProbeReply(id, target, nearest,
best, counter++);
+ try {
+ ctx.src.sendAsync(complete, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected completing a probe
request from "+ctx.src+" (forwarding completion from "+src+")");
+ }
+ return true;
+ }
+
+ private boolean handleProbeRejected(Message m, PeerNode src) {
+ long id = m.getLong(DMT.UID);
+ Long lid = new Long(id);
+ double target = m.getDouble(DMT.TARGET_LOCATION);
+ double best = m.getDouble(DMT.BEST_LOCATION);
+ double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+ short htl = m.getShort(DMT.HTL);
+ short counter = m.getShort(DMT.COUNTER);
+ short reason = m.getShort(DMT.REASON);
+ if(logMINOR)
+ Logger.minor(this, "Probe rejected: "+id+" "+target+"
"+best+" "+nearest+" "+htl+" "+counter+" "+reason);
+
+ ProbeContext ctx;
+ synchronized(recentProbeContexts) {
+ ctx = (ProbeContext) recentProbeContexts.get(lid);
+ if(ctx == null) {
+ Logger.normal(this, "Unknown rejected probe
request ID "+id);
+ return false;
+ }
+ recentProbeContexts.push(lid, ctx); // promote or add
+ while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
+ recentProbeContexts.popValue();
+ }
+
+ return innerHandleProbeRequest(src, id, lid, target, best,
nearest, htl, counter, false);
+ }
+
}
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2006-09-02 19:22:56 UTC
(rev 10371)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2006-09-02 20:20:57 UTC
(rev 10372)
@@ -476,15 +476,15 @@
* This scans the same array 4 times. It would be better to scan once and
execute 4 callbacks...
* For this reason the metrics are only updated if advanced mode is enabled
*/
- public PeerNode closerPeer(PeerNode pn, HashSet routedTo, HashSet
notIgnored, double loc, boolean ignoreSelf, boolean calculateMisrouting) {
- PeerNode best = _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf,
false);
+ public PeerNode closerPeer(PeerNode pn, HashSet routedTo, HashSet
notIgnored, double loc, boolean ignoreSelf, boolean calculateMisrouting, int
minVersion) {
+ PeerNode best = _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf,
false, minVersion);
if(best == null) {
// Backoff is an advisory mechanism for balancing rather than
limiting load.
// So send a request even though everything is backed off.
- return _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf,
true);
+ return _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf,
true, minVersion);
}
if (calculateMisrouting) {
- PeerNode nbo = _closerPeer(pn, routedTo, notIgnored, loc,
ignoreSelf, true);
+ PeerNode nbo = _closerPeer(pn, routedTo, notIgnored, loc,
ignoreSelf, true, minVersion);
if(nbo != null) {
node.missRoutingDistance.report(distance(best,
nbo.getLocation().getValue()));
int numberOfConnected =
node.getPeerNodeStatusSize(Node.PEER_NODE_STATUS_CONNECTED);
@@ -501,7 +501,7 @@
* Find the peer, if any, which is closer to the target location
* than we are, and is not included in the provided set.
*/
- private PeerNode _closerPeer(PeerNode pn, HashSet routedTo, HashSet
notIgnored, double loc, boolean ignoreSelf, boolean ignoreBackedOff) {
+ private PeerNode _closerPeer(PeerNode pn, HashSet routedTo, HashSet
notIgnored, double loc, boolean ignoreSelf, boolean ignoreBackedOff, int
minVersion) {
PeerNode[] peers;
synchronized (this) {
peers = connectedPeers;
@@ -532,6 +532,10 @@
if(logMINOR) Logger.minor(this, "Skipping (routing backed off):
"+p.getPeer());
continue;
}
+ if(minVersion > 0 &&
Version.getArbitraryBuildNumber(p.getVersion()) < minVersion) {
+ if(logMINOR) Logger.minor(this, "Skipping old version:
"+p.getPeer());
+ continue;
+ }
count++;
any = p;
double diff = distance(p, loc);
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2006-09-02 19:22:56 UTC
(rev 10371)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2006-09-02 20:20:57 UTC
(rev 10372)
@@ -128,7 +128,7 @@
// Route it
PeerNode next;
double nextValue;
- next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
if(next != null)
nextValue = next.getLocation().getValue();
else
Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-09-02 19:22:56 UTC
(rev 10371)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-09-02 20:20:57 UTC
(rev 10372)
@@ -135,7 +135,7 @@
// Can backtrack, so only route to nodes closer than we are to
target.
double nextValue;
synchronized(node.peers) {
- next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
if(next != null)
nextValue = next.getLocation().getValue();
else
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-09-02 19:22:56 UTC (rev
10371)
+++ trunk/freenet/src/freenet/node/Version.java 2006-09-02 20:20:57 UTC (rev
10372)
@@ -21,17 +21,17 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 959;
+ private static final int buildNumber = 960;
/** Oldest build of Fred we will talk to */
- private static final int oldLastGoodBuild = 944;
- private static final int newLastGoodBuild = 950;
+ private static final int oldLastGoodBuild = 950;
+ private static final int newLastGoodBuild = 960;
private static final long transitionTime;
static {
final Calendar _cal =
Calendar.getInstance(TimeZone.getTimeZone("GMT"));
// year, month - 1 (or constant), day, jour, minute, second
- _cal.set( 2006, Calendar.AUGUST, 22, 0, 0, 0 );
+ _cal.set( 2006, Calendar.SEPTEMBER, 9, 0, 0, 0 );
transitionTime = _cal.getTimeInMillis();
}