Author: toad
Date: 2006-09-02 20:20:57 +0000 (Sat, 02 Sep 2006)
New Revision: 10372

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/node/CHKInsertSender.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
   trunk/freenet/src/freenet/node/Version.java
Log:
960: Implement probe-requests. Not tested yet! Build is otherwise harmless and 
will be mandatory on the 9th of September.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2006-09-02 19:22:56 UTC (rev 
10371)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2006-09-02 20:20:57 UTC (rev 
10372)
@@ -71,6 +71,7 @@
     public static final String PING_SEQNO = "pingSequenceNumber";
     public static final String LOCATION = "location";
     public static final String NEAREST_LOCATION = "nearestLocation";
+    public static final String BEST_LOCATION = "bestLocation";
     public static final String TARGET_LOCATION = "targetLocation";
     public static final String TYPE = "type";
     public static final String PAYLOAD = "payload";
@@ -669,6 +670,72 @@
         return msg;
     }

+    public static final MessageType FNPProbeRequest = new 
MessageType("FNPProbeRequest") {{
+       addField(UID, Long.class);
+       addField(TARGET_LOCATION, Double.class);
+       addField(NEAREST_LOCATION, Double.class);
+       addField(BEST_LOCATION, Double.class);
+       addField(HTL, Short.class);
+       addField(COUNTER, Short.class);
+    }};
+    
+    public static final Message createFNPProbeRequest(long uid, double target, 
double nearest, 
+               double best, short htl, short counter) {
+       Message msg = new Message(FNPProbeRequest);
+       msg.set(UID, uid);
+       msg.set(TARGET_LOCATION, target);
+       msg.set(NEAREST_LOCATION, nearest);
+       msg.set(BEST_LOCATION, best);
+       msg.set(HTL, htl);
+       msg.set(COUNTER, counter);
+       return msg;
+    }
+
+    public static final MessageType FNPProbeReply = new 
MessageType("FNPProbeReply") {{
+       addField(UID, Long.class);
+       addField(TARGET_LOCATION, Double.class);
+       addField(NEAREST_LOCATION, Double.class);
+       addField(BEST_LOCATION, Double.class);
+       addField(COUNTER, Short.class);
+    }};
+    
+    public static final Message createFNPProbeReply(long uid, double target, 
double nearest, 
+               double best, short counter) {
+       Message msg = new Message(FNPProbeReply);
+       msg.set(UID, uid);
+       msg.set(TARGET_LOCATION, target);
+       msg.set(NEAREST_LOCATION, nearest);
+       msg.set(BEST_LOCATION, best);
+       msg.set(COUNTER, counter);
+       return msg;
+    }
+    
+    public static final MessageType FNPProbeRejected = new 
MessageType("FNPProbeRejected") {{
+       addField(UID, Long.class);
+       addField(TARGET_LOCATION, Double.class);
+       addField(NEAREST_LOCATION, Double.class);
+       addField(BEST_LOCATION, Double.class);
+       addField(HTL, Short.class);
+       addField(COUNTER, Short.class);
+       addField(REASON, Short.class);
+    }};
+    
+    public static final Message createFNPProbeRejected(long uid, double 
target, double nearest, 
+               double best, short counter, short reason) {
+       Message msg = new Message(FNPProbeRejected);
+       msg.set(UID, uid);
+       msg.set(TARGET_LOCATION, target);
+       msg.set(NEAREST_LOCATION, nearest);
+       msg.set(BEST_LOCATION, best);
+       msg.set(COUNTER, counter);
+       msg.set(REASON, reason);
+       return msg;
+    }
+
+    static public final short PROBE_REJECTED_LOOP = 1;
+    static public final short PROBE_REJECTED_RNF = 2;
+    static public final short PROBE_REJECTED_OVERLOAD = 3;
+    
     public static final MessageType FNPSwapRequest = new 
MessageType("FNPSwapRequest") {{
         addField(UID, Long.class);
         addField(HASH, ShortBuffer.class);

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-09-02 19:22:56 UTC 
(rev 10371)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-09-02 20:20:57 UTC 
(rev 10372)
@@ -239,7 +239,7 @@
             PeerNode next;
             // Can backtrack, so only route to nodes closer than we are to 
target.
             double nextValue;
-            next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+            next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
             if(next != null)
                 nextValue = next.getLocation().getValue();
             else

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-09-02 19:22:56 UTC 
(rev 10371)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-09-02 20:20:57 UTC 
(rev 10372)
@@ -9,6 +9,8 @@
 import freenet.io.comm.MessageType;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
+import freenet.support.LRUHashtable;
+import freenet.support.LRUQueue;
 import freenet.support.Logger;

 /**
@@ -93,12 +95,6 @@
             return node.lm.handleSwapCommit(m);
         } else if(spec == DMT.FNPSwapComplete) {
             return node.lm.handleSwapComplete(m);
-        } else if(spec == DMT.FNPRoutedPing) {
-            return handleRouted(m);
-        } else if(spec == DMT.FNPRoutedPong) {
-            return handleRoutedReply(m);
-        } else if(spec == DMT.FNPRoutedRejected) {
-            return handleRoutedRejected(m);
         } else if(spec == DMT.FNPCHKDataRequest) {
                return handleDataRequest(m, false);
         } else if(spec == DMT.FNPSSKDataRequest) {
@@ -107,11 +103,23 @@
                return handleInsertRequest(m, false);
         } else if(spec == DMT.FNPSSKInsertRequest) {
             return handleInsertRequest(m, true);
-        } 
+        } else if(spec == DMT.FNPRoutedPing) {
+            return handleRouted(m);
+        } else if(spec == DMT.FNPRoutedPong) {
+            return handleRoutedReply(m);
+        } else if(spec == DMT.FNPRoutedRejected) {
+            return handleRoutedRejected(m);
+        } else if(spec == DMT.FNPProbeRequest) {
+               return handleProbeRequest(m, source);
+        } else if(spec == DMT.FNPProbeReply) {
+               return handleProbeReply(m, source);
+        } else if(spec == DMT.FNPProbeRejected) {
+               return handleProbeRejected(m, source);
+        }
         return false;
     }

-    /**
+       /**
      * Handle an incoming FNPDataRequest.
      */
     private boolean handleDataRequest(Message m, boolean isSSK) {
@@ -329,7 +337,7 @@
         // Forward
         m = preForward(m, htl);
         while(true) {
-            PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, 
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled());
+            PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, 
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
             if(logMINOR) Logger.minor(this, "Next: "+next+" message: "+m);
             if(next != null) {
                // next is connected, or at least has been => next.getPeer() 
CANNOT be null.
@@ -388,4 +396,244 @@
         }
         return false;
     }
+
+    // Probe request handling
+
+    long tLastReceivedProbeRequest;
+    
+    static final int MAX_PROBE_CONTEXTS = 1000;
+    
+    class ProbeContext {
+
+       final PeerNode src; // FIXME make this a weak reference or something ? 
- Memory leak with high connection churn
+       final HashSet visitedPeers;
+       short counter;
+       short htl;
+       double nearest;
+       double best;
+       
+               public ProbeContext(long id, double target, double best, double 
nearest, short htl, short counter, PeerNode src) {
+                       visitedPeers = new HashSet();
+                       this.counter = counter;
+                       this.htl = htl;
+                       this.nearest = nearest;
+                       this.best = best;
+                       this.src = src;
+               }
+       
+    }
+    
+    final LRUQueue recentProbeRequestIDs = new LRUQueue();
+    final LRUHashtable recentProbeContexts = new LRUHashtable();
+    
+    /** 
+     * Handle a probe request.
+     * Reject it if it's looped.
+     * Look up (and promote) its context object.
+     * Update its HTL, nearest-seen and best-seen.
+     * Complete it if it has run out of HTL.
+     * Otherwise forward it.
+     **/
+       private boolean handleProbeRequest(Message m, PeerNode src) {
+               long id = m.getLong(DMT.UID);
+               Long lid = new Long(id);
+               double target = m.getDouble(DMT.TARGET_LOCATION);
+               double best = m.getDouble(DMT.BEST_LOCATION);
+               double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+               short htl = m.getShort(DMT.HTL);
+               short counter = m.getShort(DMT.COUNTER);
+               if(logMINOR)
+                       Logger.minor(this, "Probe request: "+id+" "+target+" 
"+best+" "+nearest+" "+htl+" "+counter);
+               if(recentProbeRequestIDs.contains(lid)) {
+                       // Reject: Loop
+                       Message reject = DMT.createFNPProbeRejected(id, target, 
nearest, best, counter, DMT.PROBE_REJECTED_LOOP);
+                       try {
+                               src.sendAsync(reject, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.error(this, "Not connected rejecting a 
probe request from "+src);
+                       }
+                       return true;
+               }
+               return innerHandleProbeRequest(src, id, lid, target, best, 
nearest, htl, counter, true);
+       }
+
+    private boolean innerHandleProbeRequest(PeerNode src, long id, Long lid, 
double target, double best, 
+               double nearest, short htl, short counter, boolean checkRecent) {
+       if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+       if(htl <= 1) htl = 1;
+               ProbeContext ctx = null;
+               boolean rejected = false;
+               synchronized(recentProbeContexts) {
+                       if(checkRecent) {
+                               long now = System.currentTimeMillis();
+                               if(now - tLastReceivedProbeRequest < 1000) {
+                                       rejected = true;
+                               } else
+                                       tLastReceivedProbeRequest = now;
+                       }
+                       if(!rejected) {
+                               ctx = (ProbeContext) 
recentProbeContexts.get(lid);
+                               if(ctx == null) {
+                                       ctx = new ProbeContext(id, target, 
best, nearest, htl, counter, src);
+                               }
+                               recentProbeContexts.push(lid, ctx); // promote 
or add
+                               while(recentProbeContexts.size() > 
MAX_PROBE_CONTEXTS)
+                                       recentProbeContexts.popValue();
+                       }
+               }
+               if(rejected) {
+                       // Reject: rate limit
+                       Message reject = DMT.createFNPProbeRejected(id, target, 
nearest, best, counter, DMT.PROBE_REJECTED_OVERLOAD);
+                       try {
+                               src.sendAsync(reject, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.error(this, "Not connected rejecting a 
probe request from "+src);
+                       }
+                       return true;
+               }
+               // FIXME Update any important values on ctx
+               if(ctx.counter < counter) ctx.counter = counter;
+               double oldDist = PeerManager.distance(ctx.nearest, target);
+               double newDist = PeerManager.distance(nearest, target);
+               // FIXME use this elsewhere? Does it make sense?
+               if(oldDist > newDist) {
+                       ctx.htl = htl;
+                       ctx.nearest = nearest;
+               } else if(Math.abs(oldDist - newDist) < Double.MIN_VALUE*2) {
+                       if(htl > ctx.htl-1) htl = 
(short)Math.max(0,(ctx.htl-1));
+                       else ctx.htl = htl;
+               } else {
+                       Logger.error(this, "Distance increased: "+oldDist+" -> 
"+newDist+" htl: "+ctx.htl+" -> "+htl+" , using old HTL and dist");
+                       htl = ctx.htl;
+                       nearest = ctx.nearest;
+               }
+               
+               PeerNode[] peers = node.peers.myPeers;
+               
+               // Update best
+               
+               for(int i=0;i<peers.length;i++) {
+                       if(!peers[i].isConnected()) continue;
+                       double loc = peers[i].getLocation().getValue();
+                       // We are only interested in locations greater than the 
target
+                       if(loc <= (best + 2*Double.MIN_VALUE)) continue;
+                       if(loc < best) best = loc;
+               }
+               
+               // Update nearest
+               
+               double myLoc = node.getLocation();
+               if(PeerManager.distance(myLoc, target) > 
PeerManager.distance(nearest, target)) {
+                       nearest = myLoc;
+                       htl = Node.MAX_HTL;
+               } else {
+                       htl--;
+                       if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+               }
+               
+               // Complete ?
+               if(htl == 0) {
+                       // Complete
+                       Message complete = DMT.createFNPProbeReply(id, target, 
nearest, best, counter++);
+                       try {
+                               src.sendAsync(complete, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.error(this, "Not connected completing a 
probe request from "+src);
+                       }
+                       return true;
+               }
+               
+               // Otherwise route it
+               
+               HashSet visited = ctx.visitedPeers;
+               
+               while(true) {
+                       
+                       PeerNode pn = node.peers.closerPeer(src, visited, null, 
target, true, false, 960);
+                       
+                       if(pn == null) {
+                               // Can't complete, because some HTL left
+                               // Reject: RNF
+                               Message reject = DMT.createFNPProbeRejected(id, 
target, nearest, best, counter, DMT.PROBE_REJECTED_RNF);
+                               try {
+                                       src.sendAsync(reject, null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       Logger.error(this, "Not connected 
rejecting a probe request from "+src);
+                               }
+                               return true;
+                       }
+                       
+                       visited.add(pn);
+                       
+                       Message forwarded =
+                               DMT.createFNPProbeRequest(id, target, nearest, 
best, htl, counter++);
+                       try {
+                               pn.sendAsync(forwarded, null, 0, null);
+                               return true;
+                       } catch (NotConnectedException e) {
+                               Logger.error(this, "Could not forward message: 
disconnected: "+pn+" : "+e, e);
+                               // Try another one
+                       }
+               }
+               
+       }
+
+       private boolean handleProbeReply(Message m, PeerNode src) {
+               long id = m.getLong(DMT.UID);
+               Long lid = new Long(id);
+               double target = m.getDouble(DMT.TARGET_LOCATION);
+               double best = m.getDouble(DMT.BEST_LOCATION);
+               double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+               short counter = m.getShort(DMT.COUNTER);
+               if(logMINOR)
+                       Logger.minor(this, "Probe reply: "+id+" "+target+" 
"+best+" "+nearest);
+       // Just propagate back to source
+               ProbeContext ctx;
+               synchronized(recentProbeContexts) {
+                       ctx = (ProbeContext) recentProbeContexts.get(lid);
+                       if(ctx == null) {
+                               Logger.normal(this, "Could not forward probe 
reply back to source for ID "+id);
+                               return false;
+                       }
+                       recentProbeContexts.push(lid, ctx); // promote or add
+                       while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
+                               recentProbeContexts.popValue();
+               }
+               
+               Message complete = DMT.createFNPProbeReply(id, target, nearest, 
best, counter++);
+               try {
+                       ctx.src.sendAsync(complete, null, 0, null);
+               } catch (NotConnectedException e) {
+                       Logger.error(this, "Not connected completing a probe 
request from "+ctx.src+" (forwarding completion from "+src+")");
+               }
+               return true;
+       }
+
+    private boolean handleProbeRejected(Message m, PeerNode src) {
+               long id = m.getLong(DMT.UID);
+               Long lid = new Long(id);
+               double target = m.getDouble(DMT.TARGET_LOCATION);
+               double best = m.getDouble(DMT.BEST_LOCATION);
+               double nearest = m.getDouble(DMT.NEAREST_LOCATION);
+               short htl = m.getShort(DMT.HTL);
+               short counter = m.getShort(DMT.COUNTER);
+               short reason = m.getShort(DMT.REASON);
+               if(logMINOR)
+                       Logger.minor(this, "Probe rejected: "+id+" "+target+" 
"+best+" "+nearest+" "+htl+" "+counter+" "+reason);
+               
+               ProbeContext ctx;
+               synchronized(recentProbeContexts) {
+                       ctx = (ProbeContext) recentProbeContexts.get(lid);
+                       if(ctx == null) {
+                               Logger.normal(this, "Unknown rejected probe 
request ID "+id);
+                               return false;
+                       }
+                       recentProbeContexts.push(lid, ctx); // promote or add
+                       while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
+                               recentProbeContexts.popValue();
+               }
+               
+               return innerHandleProbeRequest(src, id, lid, target, best, 
nearest, htl, counter, false);
+    }
+
 }

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2006-09-02 19:22:56 UTC 
(rev 10371)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2006-09-02 20:20:57 UTC 
(rev 10372)
@@ -476,15 +476,15 @@
      * This scans the same array 4 times.  It would be better to scan once and 
execute 4 callbacks...
      * For this reason the metrics are only updated if advanced mode is enabled
      */
-    public PeerNode closerPeer(PeerNode pn, HashSet routedTo, HashSet 
notIgnored, double loc, boolean ignoreSelf, boolean calculateMisrouting) {
-       PeerNode best = _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf, 
false);
+    public PeerNode closerPeer(PeerNode pn, HashSet routedTo, HashSet 
notIgnored, double loc, boolean ignoreSelf, boolean calculateMisrouting, int 
minVersion) {
+       PeerNode best = _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf, 
false, minVersion);
        if(best == null) {
                // Backoff is an advisory mechanism for balancing rather than 
limiting load.
                // So send a request even though everything is backed off.
-               return _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf, 
true);
+               return _closerPeer(pn, routedTo, notIgnored, loc, ignoreSelf, 
true, minVersion);
        }
        if (calculateMisrouting) {
-               PeerNode nbo = _closerPeer(pn, routedTo, notIgnored, loc, 
ignoreSelf, true);
+               PeerNode nbo = _closerPeer(pn, routedTo, notIgnored, loc, 
ignoreSelf, true, minVersion);
                if(nbo != null) {
                        node.missRoutingDistance.report(distance(best, 
nbo.getLocation().getValue()));
                        int numberOfConnected = 
node.getPeerNodeStatusSize(Node.PEER_NODE_STATUS_CONNECTED);
@@ -501,7 +501,7 @@
      * Find the peer, if any, which is closer to the target location
      * than we are, and is not included in the provided set.
      */
-    private PeerNode _closerPeer(PeerNode pn, HashSet routedTo, HashSet 
notIgnored, double loc, boolean ignoreSelf, boolean ignoreBackedOff) {
+    private PeerNode _closerPeer(PeerNode pn, HashSet routedTo, HashSet 
notIgnored, double loc, boolean ignoreSelf, boolean ignoreBackedOff, int 
minVersion) {
         PeerNode[] peers;  
         synchronized (this) {
                        peers = connectedPeers;
@@ -532,6 +532,10 @@
                if(logMINOR) Logger.minor(this, "Skipping (routing backed off): 
"+p.getPeer());
                continue;
             }
+            if(minVersion > 0 && 
Version.getArbitraryBuildNumber(p.getVersion()) < minVersion) {
+               if(logMINOR) Logger.minor(this, "Skipping old version: 
"+p.getPeer());
+               continue;
+            }
             count++;
             any = p;
             double diff = distance(p, loc);

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2006-09-02 19:22:56 UTC 
(rev 10371)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2006-09-02 20:20:57 UTC 
(rev 10372)
@@ -128,7 +128,7 @@
             // Route it
             PeerNode next;
             double nextValue;
-            next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+            next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
             if(next != null)
                 nextValue = next.getLocation().getValue();
             else

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-09-02 19:22:56 UTC 
(rev 10371)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-09-02 20:20:57 UTC 
(rev 10372)
@@ -135,7 +135,7 @@
             // Can backtrack, so only route to nodes closer than we are to 
target.
             double nextValue;
             synchronized(node.peers) {
-                next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled());
+                next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
                 if(next != null)
                     nextValue = next.getLocation().getValue();
                 else

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-09-02 19:22:56 UTC (rev 
10371)
+++ trunk/freenet/src/freenet/node/Version.java 2006-09-02 20:20:57 UTC (rev 
10372)
@@ -21,17 +21,17 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 959;
+       private static final int buildNumber = 960;

        /** Oldest build of Fred we will talk to */
-       private static final int oldLastGoodBuild = 944;
-       private static final int newLastGoodBuild = 950;
+       private static final int oldLastGoodBuild = 950;
+       private static final int newLastGoodBuild = 960;
        private static final long transitionTime;

        static {
                final Calendar _cal = 
Calendar.getInstance(TimeZone.getTimeZone("GMT"));
                // year, month - 1 (or constant), day, jour, minute, second
-               _cal.set( 2006, Calendar.AUGUST, 22, 0, 0, 0 );
+               _cal.set( 2006, Calendar.SEPTEMBER, 9, 0, 0, 0 );
                transitionTime = _cal.getTimeInMillis();
        }



Reply via email to