Author: nextgens
Date: 2006-12-06 10:43:42 +0000 (Wed, 06 Dec 2006)
New Revision: 11270

Modified:
   trunk/freenet/src/freenet/node/NodeDispatcher.java
Log:
indent


Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-12-06 06:40:50 UTC 
(rev 11269)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-12-06 10:43:42 UTC 
(rev 11270)
@@ -37,390 +37,390 @@
 public class NodeDispatcher implements Dispatcher {

        private static boolean logMINOR;
-    final Node node;
-    
-    NodeDispatcher(Node node) {
-        this.node = node;
-        logMINOR = Logger.shouldLog(Logger.MINOR, this);
-    }
-    
-    public boolean handleMessage(Message m) {
-        logMINOR = Logger.shouldLog(Logger.MINOR, this);
-        PeerNode source = (PeerNode)m.getSource();
-        if(logMINOR) Logger.minor(this, "Dispatching "+m+" from "+source);
-        MessageType spec = m.getSpec();
-        if(spec == DMT.FNPPing) {
-            // Send an FNPPong
-            Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
-            try {
-                ((PeerNode)m.getSource()).sendAsync(reply, null, 0, null); // 
nothing we can do if can't contact source
-            } catch (NotConnectedException e) {
-               if(logMINOR) Logger.minor(this, "Lost connection replying to 
"+m);
-            }
-            return true;
-        }else if(spec == DMT.FNPLinkPing) {
-               long id = m.getLong(DMT.PING_SEQNO);
-               Message msg = DMT.createFNPLinkPong(id);
-               try {
+       final Node node;
+
+       NodeDispatcher(Node node) {
+               this.node = node;
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+       }
+
+       public boolean handleMessage(Message m) {
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               PeerNode source = (PeerNode)m.getSource();
+               if(logMINOR) Logger.minor(this, "Dispatching "+m+" from 
"+source);
+               MessageType spec = m.getSpec();
+               if(spec == DMT.FNPPing) {
+                       // Send an FNPPong
+                       Message reply = 
DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
+                       try {
+                               ((PeerNode)m.getSource()).sendAsync(reply, 
null, 0, null); // nothing we can do if can't contact source
+                       } catch (NotConnectedException e) {
+                               if(logMINOR) Logger.minor(this, "Lost 
connection replying to "+m);
+                       }
+                       return true;
+               }else if(spec == DMT.FNPLinkPing) {
+                       long id = m.getLong(DMT.PING_SEQNO);
+                       Message msg = DMT.createFNPLinkPong(id);
+                       try {
                                source.sendAsync(msg, null, 0, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
-               return true;
-        } else if(spec == DMT.FNPLinkPong) {
-               long id = m.getLong(DMT.PING_SEQNO);
-               source.receivedLinkPong(id);
-               return true;
-        } else if(spec == DMT.FNPDetectedIPAddress) {
-               Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
-               source.setRemoteDetectedPeer(p);
-               node.ipDetector.redetectAddress();
-               return true;
-        } else if(spec == DMT.FNPVoid) {
-               return true;
-        } else if(spec == DMT.nodeToNodeMessage) {
-               node.receivedNodeToNodeMessage(m);
-               return true;
-        } else if(spec == DMT.nodeToNodeTextMessage) {
-               node.receivedNodeToNodeTextMessage(m);
-               return true;
-        }
-        
-        if(!source.isRoutable()) return false;
-        
-        if(spec == DMT.FNPLocChangeNotification) {
-            double newLoc = m.getDouble(DMT.LOCATION);
-            source.updateLocation(newLoc);
-            return true;
-        } else if(spec == DMT.FNPSwapRequest) {
-            return node.lm.handleSwapRequest(m);
-        } else if(spec == DMT.FNPSwapReply) {
-            return node.lm.handleSwapReply(m);
-        } else if(spec == DMT.FNPSwapRejected) {
-            return node.lm.handleSwapRejected(m);
-        } else if(spec == DMT.FNPSwapCommit) {
-            return node.lm.handleSwapCommit(m);
-        } else if(spec == DMT.FNPSwapComplete) {
-            return node.lm.handleSwapComplete(m);
-        } else if(spec == DMT.FNPCHKDataRequest) {
-               return handleDataRequest(m, false);
-        } else if(spec == DMT.FNPSSKDataRequest) {
-            return handleDataRequest(m, true);
-        } else if(spec == DMT.FNPInsertRequest) {
-               return handleInsertRequest(m, false);
-        } else if(spec == DMT.FNPSSKInsertRequest) {
-            return handleInsertRequest(m, true);
-        } else if(spec == DMT.FNPRoutedPing) {
-            return handleRouted(m);
-        } else if(spec == DMT.FNPRoutedPong) {
-            return handleRoutedReply(m);
-        } else if(spec == DMT.FNPRoutedRejected) {
-            return handleRoutedRejected(m);
-        } else if(spec == DMT.FNPProbeRequest) {
-               return handleProbeRequest(m, source);
-        } else if(spec == DMT.FNPProbeReply) {
-               return handleProbeReply(m, source);
-        } else if(spec == DMT.FNPProbeRejected) {
-               return handleProbeRejected(m, source);
-        }
-        return false;
-    }
+                       return true;
+               } else if(spec == DMT.FNPLinkPong) {
+                       long id = m.getLong(DMT.PING_SEQNO);
+                       source.receivedLinkPong(id);
+                       return true;
+               } else if(spec == DMT.FNPDetectedIPAddress) {
+                       Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
+                       source.setRemoteDetectedPeer(p);
+                       node.ipDetector.redetectAddress();
+                       return true;
+               } else if(spec == DMT.FNPVoid) {
+                       return true;
+               } else if(spec == DMT.nodeToNodeMessage) {
+                       node.receivedNodeToNodeMessage(m);
+                       return true;
+               } else if(spec == DMT.nodeToNodeTextMessage) {
+                       node.receivedNodeToNodeTextMessage(m);
+                       return true;
+               }

+               if(!source.isRoutable()) return false;
+
+               if(spec == DMT.FNPLocChangeNotification) {
+                       double newLoc = m.getDouble(DMT.LOCATION);
+                       source.updateLocation(newLoc);
+                       return true;
+               } else if(spec == DMT.FNPSwapRequest) {
+                       return node.lm.handleSwapRequest(m);
+               } else if(spec == DMT.FNPSwapReply) {
+                       return node.lm.handleSwapReply(m);
+               } else if(spec == DMT.FNPSwapRejected) {
+                       return node.lm.handleSwapRejected(m);
+               } else if(spec == DMT.FNPSwapCommit) {
+                       return node.lm.handleSwapCommit(m);
+               } else if(spec == DMT.FNPSwapComplete) {
+                       return node.lm.handleSwapComplete(m);
+               } else if(spec == DMT.FNPCHKDataRequest) {
+                       return handleDataRequest(m, false);
+               } else if(spec == DMT.FNPSSKDataRequest) {
+                       return handleDataRequest(m, true);
+               } else if(spec == DMT.FNPInsertRequest) {
+                       return handleInsertRequest(m, false);
+               } else if(spec == DMT.FNPSSKInsertRequest) {
+                       return handleInsertRequest(m, true);
+               } else if(spec == DMT.FNPRoutedPing) {
+                       return handleRouted(m);
+               } else if(spec == DMT.FNPRoutedPong) {
+                       return handleRoutedReply(m);
+               } else if(spec == DMT.FNPRoutedRejected) {
+                       return handleRoutedRejected(m);
+               } else if(spec == DMT.FNPProbeRequest) {
+                       return handleProbeRequest(m, source);
+               } else if(spec == DMT.FNPProbeReply) {
+                       return handleProbeReply(m, source);
+               } else if(spec == DMT.FNPProbeRejected) {
+                       return handleProbeRejected(m, source);
+               }
+               return false;
+       }
+
        /**
-     * Handle an incoming FNPDataRequest.
-     */
-    private boolean handleDataRequest(Message m, boolean isSSK) {
-        long id = m.getLong(DMT.UID);
-        if(node.recentlyCompleted(id)) {
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting data request (loop, finished): 
"+e);
-            }
-            return true;
-        }
-        String rejectReason = node.shouldRejectRequest(!isSSK, false, isSSK);
-        if(rejectReason != null) {
-               // can accept 1 CHK request every so often, but not with SSKs 
because they aren't throttled so won't sort out bwlimitDelayTime, which was the 
whole reason for accepting them when overloaded...
-               Logger.normal(this, "Rejecting request from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
-               Message rejected = DMT.createFNPRejectedOverload(id, true);
-               try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting (overload) data request from 
"+m.getSource().getPeer()+": "+e);
-               }
-            node.completed(id);
-            return true;
-        }
-        if(!node.lockUID(id)) {
-               if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> 
rejecting (already running)");
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        } else {
-               if(logMINOR) Logger.minor(this, "Locked "+id);
-        }
-        //if(!node.lockUID(id)) return false;
-        RequestHandler rh = new RequestHandler(m, id, node);
-        Thread t = new Thread(rh, "RequestHandler for UID "+id);
-        t.setDaemon(true);
-        t.start();
-        return true;
-    }
-    
-    private boolean handleInsertRequest(Message m, boolean isSSK) {
-        long id = m.getLong(DMT.UID);
-        if(node.recentlyCompleted(id)) {
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        }
-        // SSKs don't fix bwlimitDelayTime so shouldn't be accepted when 
overloaded.
-        String rejectReason = node.shouldRejectRequest(!isSSK, true, isSSK);
-        if(rejectReason != null) {
-               Logger.normal(this, "Rejecting insert from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
-               Message rejected = DMT.createFNPRejectedOverload(id, true);
-               try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting (overload) insert request from 
"+m.getSource().getPeer()+": "+e);
-               }
-            node.completed(id);
-            return true;
-        }
-        if(!node.lockUID(id)) {
-               if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> 
rejecting (already running)");
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        }
-        long now = System.currentTimeMillis();
-        if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
-               SSKInsertHandler rh = new SSKInsertHandler(m, id, node, now);
-            Thread t = new Thread(rh, "InsertHandler for "+id+" on 
"+node.portNumber);
-            t.setDaemon(true);
-            t.start();
-        } else {
-               InsertHandler rh = new InsertHandler(m, id, node, now);
-               Thread t = new Thread(rh, "InsertHandler for "+id+" on 
"+node.portNumber);
-               t.setDaemon(true);
-               t.start();
-        }
-        if(logMINOR) Logger.minor(this, "Started InsertHandler for "+id);
-        return true;
-    }
+        * Handle an incoming FNPDataRequest.
+        */
+       private boolean handleDataRequest(Message m, boolean isSSK) {
+               long id = m.getLong(DMT.UID);
+               if(node.recentlyCompleted(id)) {
+                       Message rejected = DMT.createFNPRejectedLoop(id);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting data request 
(loop, finished): "+e);
+                       }
+                       return true;
+               }
+               String rejectReason = node.shouldRejectRequest(!isSSK, false, 
isSSK);
+               if(rejectReason != null) {
+                       // can accept 1 CHK request every so often, but not 
with SSKs because they aren't throttled so won't sort out bwlimitDelayTime, 
which was the whole reason for accepting them when overloaded...
+                       Logger.normal(this, "Rejecting request from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
+                       Message rejected = DMT.createFNPRejectedOverload(id, 
true);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting (overload) data 
request from "+m.getSource().getPeer()+": "+e);
+                       }
+                       node.completed(id);
+                       return true;
+               }
+               if(!node.lockUID(id)) {
+                       if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
+                       Message rejected = DMT.createFNPRejectedLoop(id);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting insert request 
from "+m.getSource().getPeer()+": "+e);
+                       }
+                       return true;
+               } else {
+                       if(logMINOR) Logger.minor(this, "Locked "+id);
+               }
+               //if(!node.lockUID(id)) return false;
+               RequestHandler rh = new RequestHandler(m, id, node);
+               Thread t = new Thread(rh, "RequestHandler for UID "+id);
+               t.setDaemon(true);
+               t.start();
+               return true;
+       }

-    final Hashtable routedContexts = new Hashtable();
-    
-    static class RoutedContext {
-        long createdTime;
-        long accessTime;
-        PeerNode source;
-        final HashSet routedTo;
-        final HashSet notIgnored;
-        Message msg;
-        short lastHtl;
-        
-        RoutedContext(Message msg) {
-            createdTime = accessTime = System.currentTimeMillis();
-            source = (PeerNode)msg.getSource();
-            routedTo = new HashSet();
-            notIgnored = new HashSet();
-            this.msg = msg;
-            lastHtl = msg.getShort(DMT.HTL);
-        }
-        
-        void addSent(PeerNode n) {
-            routedTo.add(n);
-        }
-    }
-    
-    /**
-     * Handle an FNPRoutedRejected message.
-     */
-    private boolean handleRoutedRejected(Message m) {
-        long id = m.getLong(DMT.UID);
-        Long lid = new Long(id);
-        RoutedContext rc = (RoutedContext) routedContexts.get(lid);
-        if(rc == null) {
-            // Gah
-            Logger.error(this, "Unrecognized FNPRoutedRejected");
-            return false; // locally originated??
-        }
-        short htl = rc.lastHtl;
-        if(rc.source != null)
-            htl = rc.source.decrementHTL(htl);
-        short ohtl = m.getShort(DMT.HTL);
-        if(ohtl < htl) htl = ohtl;
-        // Try routing to the next node
-        forward(rc.msg, id, rc.source, htl, 
rc.msg.getDouble(DMT.TARGET_LOCATION), rc);
-        return true;
-    }
+       private boolean handleInsertRequest(Message m, boolean isSSK) {
+               long id = m.getLong(DMT.UID);
+               if(node.recentlyCompleted(id)) {
+                       Message rejected = DMT.createFNPRejectedLoop(id);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting insert request 
from "+m.getSource().getPeer()+": "+e);
+                       }
+                       return true;
+               }
+               // SSKs don't fix bwlimitDelayTime so shouldn't be accepted 
when overloaded.
+               String rejectReason = node.shouldRejectRequest(!isSSK, true, 
isSSK);
+               if(rejectReason != null) {
+                       Logger.normal(this, "Rejecting insert from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
+                       Message rejected = DMT.createFNPRejectedOverload(id, 
true);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting (overload) 
insert request from "+m.getSource().getPeer()+": "+e);
+                       }
+                       node.completed(id);
+                       return true;
+               }
+               if(!node.lockUID(id)) {
+                       if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
+                       Message rejected = DMT.createFNPRejectedLoop(id);
+                       try {
+                               ((PeerNode)(m.getSource())).sendAsync(rejected, 
null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting insert request 
from "+m.getSource().getPeer()+": "+e);
+                       }
+                       return true;
+               }
+               long now = System.currentTimeMillis();
+               if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
+                       SSKInsertHandler rh = new SSKInsertHandler(m, id, node, 
now);
+                       Thread t = new Thread(rh, "InsertHandler for "+id+" on 
"+node.portNumber);
+                       t.setDaemon(true);
+                       t.start();
+               } else {
+                       InsertHandler rh = new InsertHandler(m, id, node, now);
+                       Thread t = new Thread(rh, "InsertHandler for "+id+" on 
"+node.portNumber);
+                       t.setDaemon(true);
+                       t.start();
+               }
+               if(logMINOR) Logger.minor(this, "Started InsertHandler for 
"+id);
+               return true;
+       }

-    /**
-     * Handle a routed-to-a-specific-node message.
-     * @param m
-     * @return False if we want the message put back on the queue.
-     */
-    boolean handleRouted(Message m) {
-       if(logMINOR) Logger.minor(this, "handleRouted("+m+ ')');
-        if((m.getSource() != null) && (!(m.getSource() instanceof PeerNode))) {
-            Logger.error(this, "Routed message but source "+m.getSource()+" 
not a PeerNode!");
-            return true;
-        }
-        long id = m.getLong(DMT.UID);
-        Long lid = new Long(id);
-        PeerNode pn = (PeerNode) (m.getSource());
-        short htl = m.getShort(DMT.HTL);
-        if(pn != null) htl = pn.decrementHTL(htl);
-        RoutedContext ctx;
-        ctx = (RoutedContext)routedContexts.get(lid);
-        if(ctx != null) {
-            try {
-                
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null, 0, null);
-            } catch (NotConnectedException e) {
-               if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
-            }
-            return true;
-        }
-        ctx = new RoutedContext(m);
-        routedContexts.put(lid, ctx);
-        // pn == null => originated locally, keep full htl
-        double target = m.getDouble(DMT.TARGET_LOCATION);
-        if(logMINOR) Logger.minor(this, "id "+id+" from "+pn+" htl "+htl+" 
target "+target);
-        if(Math.abs(node.lm.getLocation().getValue() - target) <= 
Double.MIN_VALUE) {
-               if(logMINOR) Logger.minor(this, "Dispatching "+m.getSpec()+" on 
"+node.portNumber);
-            // Handle locally
-            // Message type specific processing
-            dispatchRoutedMessage(m, pn, id);
-            return true;
-        } else if(htl == 0) {
-            Message reject = DMT.createFNPRoutedRejected(id, (short)0);
-            if(pn != null) try {
-                pn.sendAsync(reject, null, 0, null);
-            } catch (NotConnectedException e) {
-               if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
-            }
-            return true;
-        } else {
-            return forward(m, id, pn, htl, target, ctx);
-        }
-    }
+       final Hashtable routedContexts = new Hashtable();

-    boolean handleRoutedReply(Message m) {
-        long id = m.getLong(DMT.UID);
-        if(logMINOR) Logger.minor(this, "Got reply: "+m);
-        Long lid = new Long(id);
-        RoutedContext ctx = (RoutedContext) routedContexts.get(lid);
-        if(ctx == null) {
-            Logger.error(this, "Unrecognized routed reply: "+m);
-            return false;
-        }
-        PeerNode pn = ctx.source;
-        if(pn == null) return false;
-        try {
-            pn.sendAsync(m, null, 0, null);
-        } catch (NotConnectedException e) {
-               if(logMINOR) Logger.minor(this, "Lost connection forwarding 
"+m+" to "+pn);
-        }
-        return true;
-    }
-    
-    private boolean forward(Message m, long id, PeerNode pn, short htl, double 
target, RoutedContext ctx) {
-       if(logMINOR) Logger.minor(this, "Should forward");
-        // Forward
-        m = preForward(m, htl);
-        while(true) {
-            PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, 
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
-            if(logMINOR) Logger.minor(this, "Next: "+next+" message: "+m);
-            if(next != null) {
-               // next is connected, or at least has been => next.getPeer() 
CANNOT be null.
-               if(logMINOR) Logger.minor(this, "Forwarding "+m.getSpec()+" to 
"+next.getPeer().getPort());
-                ctx.addSent(next);
-                try {
-                    next.sendAsync(m, null, 0, null);
-                } catch (NotConnectedException e) {
-                    continue;
-                }
-            } else {
-               if(logMINOR) Logger.minor(this, "Reached dead end for 
"+m.getSpec()+" on "+node.portNumber);
-                // Reached a dead end...
-                Message reject = DMT.createFNPRoutedRejected(id, htl);
-                if(pn != null) try {
-                    pn.sendAsync(reject, null, 0, null);
-                } catch (NotConnectedException e) {
-                    Logger.error(this, "Cannot send reject message back to 
source "+pn);
-                    return true;
-                }
-            }
-            return true;
-        }
-    }
+       static class RoutedContext {
+               long createdTime;
+               long accessTime;
+               PeerNode source;
+               final HashSet routedTo;
+               final HashSet notIgnored;
+               Message msg;
+               short lastHtl;

-    /**
-     * Prepare a routed-to-node message for forwarding.
-     */
-    private Message preForward(Message m, short newHTL) {
-        m.set(DMT.HTL, newHTL); // update htl
-        if(m.getSpec() == DMT.FNPRoutedPing) {
-            int x = m.getInt(DMT.COUNTER);
-            x++;
-            m.set(DMT.COUNTER, x);
-        }
-        return m;
-    }
+               RoutedContext(Message msg) {
+                       createdTime = accessTime = System.currentTimeMillis();
+                       source = (PeerNode)msg.getSource();
+                       routedTo = new HashSet();
+                       notIgnored = new HashSet();
+                       this.msg = msg;
+                       lastHtl = msg.getShort(DMT.HTL);
+               }

-    /**
-     * Deal with a routed-to-node message that landed on this node.
-     * This is where message-type-specific code executes. 
-     * @param m
-     * @return
-     */
-    private boolean dispatchRoutedMessage(Message m, PeerNode src, long id) {
-        if(m.getSpec() == DMT.FNPRoutedPing) {
-               if(logMINOR) Logger.minor(this, "RoutedPing reached other 
side!");
-            int x = m.getInt(DMT.COUNTER);
-            Message reply = DMT.createFNPRoutedPong(id, x);
-            try {
-                src.sendAsync(reply, null, 0, null);
-            } catch (NotConnectedException e) {
-               if(logMINOR) Logger.minor(this, "Lost connection replying to 
"+m+" in dispatchRoutedMessage");
-            }
-            return true;
-        }
-        return false;
-    }
+               void addSent(PeerNode n) {
+                       routedTo.add(n);
+               }
+       }

-    // Probe request handling
+       /**
+        * Handle an FNPRoutedRejected message.
+        */
+       private boolean handleRoutedRejected(Message m) {
+               long id = m.getLong(DMT.UID);
+               Long lid = new Long(id);
+               RoutedContext rc = (RoutedContext) routedContexts.get(lid);
+               if(rc == null) {
+                       // Gah
+                       Logger.error(this, "Unrecognized FNPRoutedRejected");
+                       return false; // locally originated??
+               }
+               short htl = rc.lastHtl;
+               if(rc.source != null)
+                       htl = rc.source.decrementHTL(htl);
+               short ohtl = m.getShort(DMT.HTL);
+               if(ohtl < htl) htl = ohtl;
+               // Try routing to the next node
+               forward(rc.msg, id, rc.source, htl, 
rc.msg.getDouble(DMT.TARGET_LOCATION), rc);
+               return true;
+       }

-    long tLastReceivedProbeRequest;
-    
-    static final int MAX_PROBE_CONTEXTS = 1000;
-    static final int MAX_PROBE_IDS = 10000;
-    
-    class ProbeContext {
+       /**
+        * Handle a routed-to-a-specific-node message.
+        * @param m
+        * @return False if we want the message put back on the queue.
+        */
+       boolean handleRouted(Message m) {
+               if(logMINOR) Logger.minor(this, "handleRouted("+m+ ')');
+               if((m.getSource() != null) && (!(m.getSource() instanceof 
PeerNode))) {
+                       Logger.error(this, "Routed message but source 
"+m.getSource()+" not a PeerNode!");
+                       return true;
+               }
+               long id = m.getLong(DMT.UID);
+               Long lid = new Long(id);
+               PeerNode pn = (PeerNode) (m.getSource());
+               short htl = m.getShort(DMT.HTL);
+               if(pn != null) htl = pn.decrementHTL(htl);
+               RoutedContext ctx;
+               ctx = (RoutedContext)routedContexts.get(lid);
+               if(ctx != null) {
+                       try {
+                               
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null, 0, null);
+                       } catch (NotConnectedException e) {
+                               if(logMINOR) Logger.minor(this, "Lost 
connection rejecting "+m);
+                       }
+                       return true;
+               }
+               ctx = new RoutedContext(m);
+               routedContexts.put(lid, ctx);
+               // pn == null => originated locally, keep full htl
+               double target = m.getDouble(DMT.TARGET_LOCATION);
+               if(logMINOR) Logger.minor(this, "id "+id+" from "+pn+" htl 
"+htl+" target "+target);
+               if(Math.abs(node.lm.getLocation().getValue() - target) <= 
Double.MIN_VALUE) {
+                       if(logMINOR) Logger.minor(this, "Dispatching 
"+m.getSpec()+" on "+node.portNumber);
+                       // Handle locally
+                       // Message type specific processing
+                       dispatchRoutedMessage(m, pn, id);
+                       return true;
+               } else if(htl == 0) {
+                       Message reject = DMT.createFNPRoutedRejected(id, 
(short)0);
+                       if(pn != null) try {
+                               pn.sendAsync(reject, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               if(logMINOR) Logger.minor(this, "Lost 
connection rejecting "+m);
+                       }
+                       return true;
+               } else {
+                       return forward(m, id, pn, htl, target, ctx);
+               }
+       }

-       final PeerNode src; // FIXME make this a weak reference or something ? 
- Memory leak with high connection churn
-       final HashSet visitedPeers;
-       final ProbeCallback cb;
-       short counter;
-       short htl;
-       double nearest;
-       double best;
-       
+       boolean handleRoutedReply(Message m) {
+               long id = m.getLong(DMT.UID);
+               if(logMINOR) Logger.minor(this, "Got reply: "+m);
+               Long lid = new Long(id);
+               RoutedContext ctx = (RoutedContext) routedContexts.get(lid);
+               if(ctx == null) {
+                       Logger.error(this, "Unrecognized routed reply: "+m);
+                       return false;
+               }
+               PeerNode pn = ctx.source;
+               if(pn == null) return false;
+               try {
+                       pn.sendAsync(m, null, 0, null);
+               } catch (NotConnectedException e) {
+                       if(logMINOR) Logger.minor(this, "Lost connection 
forwarding "+m+" to "+pn);
+               }
+               return true;
+       }
+
+       private boolean forward(Message m, long id, PeerNode pn, short htl, 
double target, RoutedContext ctx) {
+               if(logMINOR) Logger.minor(this, "Should forward");
+               // Forward
+               m = preForward(m, htl);
+               while(true) {
+                       PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, 
ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
+                       if(logMINOR) Logger.minor(this, "Next: "+next+" 
message: "+m);
+                       if(next != null) {
+                               // next is connected, or at least has been => 
next.getPeer() CANNOT be null.
+                               if(logMINOR) Logger.minor(this, "Forwarding 
"+m.getSpec()+" to "+next.getPeer().getPort());
+                               ctx.addSent(next);
+                               try {
+                                       next.sendAsync(m, null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       continue;
+                               }
+                       } else {
+                               if(logMINOR) Logger.minor(this, "Reached dead 
end for "+m.getSpec()+" on "+node.portNumber);
+                               // Reached a dead end...
+                               Message reject = 
DMT.createFNPRoutedRejected(id, htl);
+                               if(pn != null) try {
+                                       pn.sendAsync(reject, null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       Logger.error(this, "Cannot send reject 
message back to source "+pn);
+                                       return true;
+                               }
+                       }
+                       return true;
+               }
+       }
+
+       /**
+        * Prepare a routed-to-node message for forwarding.
+        */
+       private Message preForward(Message m, short newHTL) {
+               m.set(DMT.HTL, newHTL); // update htl
+               if(m.getSpec() == DMT.FNPRoutedPing) {
+                       int x = m.getInt(DMT.COUNTER);
+                       x++;
+                       m.set(DMT.COUNTER, x);
+               }
+               return m;
+       }
+
+       /**
+        * Deal with a routed-to-node message that landed on this node.
+        * This is where message-type-specific code executes. 
+        * @param m
+        * @return
+        */
+       private boolean dispatchRoutedMessage(Message m, PeerNode src, long id) 
{
+               if(m.getSpec() == DMT.FNPRoutedPing) {
+                       if(logMINOR) Logger.minor(this, "RoutedPing reached 
other side!");
+                       int x = m.getInt(DMT.COUNTER);
+                       Message reply = DMT.createFNPRoutedPong(id, x);
+                       try {
+                               src.sendAsync(reply, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               if(logMINOR) Logger.minor(this, "Lost 
connection replying to "+m+" in dispatchRoutedMessage");
+                       }
+                       return true;
+               }
+               return false;
+       }
+
+       // Probe request handling
+
+       long tLastReceivedProbeRequest;
+
+       static final int MAX_PROBE_CONTEXTS = 1000;
+       static final int MAX_PROBE_IDS = 10000;
+
+       class ProbeContext {
+
+               final PeerNode src; // FIXME make this a weak reference or 
something ? - Memory leak with high connection churn
+               final HashSet visitedPeers;
+               final ProbeCallback cb;
+               short counter;
+               short htl;
+               double nearest;
+               double best;
+
                public ProbeContext(long id, double target, double best, double 
nearest, short htl, short counter, PeerNode src, ProbeCallback cb) {
                        visitedPeers = new HashSet();
                        this.counter = counter;
@@ -430,20 +430,20 @@
                        this.src = src;
                        this.cb = cb;
                }
-       
-    }
-    
-    final LRUQueue recentProbeRequestIDs = new LRUQueue();
-    final LRUHashtable recentProbeContexts = new LRUHashtable();
-    
-    /** 
-     * Handle a probe request.
-     * Reject it if it's looped.
-     * Look up (and promote) its context object.
-     * Update its HTL, nearest-seen and best-seen.
-     * Complete it if it has run out of HTL.
-     * Otherwise forward it.
-     **/
+
+       }
+
+       final LRUQueue recentProbeRequestIDs = new LRUQueue();
+       final LRUHashtable recentProbeContexts = new LRUHashtable();
+
+       /** 
+        * Handle a probe request.
+        * Reject it if it's looped.
+        * Look up (and promote) its context object.
+        * Update its HTL, nearest-seen and best-seen.
+        * Complete it if it has run out of HTL.
+        * Otherwise forward it.
+        **/
        private boolean handleProbeRequest(Message m, PeerNode src) {
                long id = m.getLong(DMT.UID);
                Long lid = new Long(id);
@@ -475,10 +475,10 @@
                return innerHandleProbeRequest(src, id, lid, target, best, 
nearest, htl, counter, true, true, null);
        }

-    private boolean innerHandleProbeRequest(PeerNode src, long id, Long lid, 
double target, double best, 
-               double nearest, short htl, short counter, boolean checkRecent, 
boolean canReject, ProbeCallback cb) {
-       if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
-       if(htl <= 1) htl = 1;
+       private boolean innerHandleProbeRequest(PeerNode src, long id, Long 
lid, double target, double best, 
+                       double nearest, short htl, short counter, boolean 
checkRecent, boolean canReject, ProbeCallback cb) {
+               if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+               if(htl <= 1) htl = 1;
                ProbeContext ctx = null;
                boolean rejected = false;
                boolean isNew = false;
@@ -545,18 +545,18 @@
                        nearest = ctx.nearest;
                }
                Logger.minor(this, "htl="+htl+", nearest="+nearest+", 
ctx.htl="+ctx.htl+", ctx.nearest="+ctx.nearest);
-               
+
                PeerNode[] peers = node.peers.myPeers;
-               
+
                double myLoc = node.getLocation();
                // Update best
-               
+
                if(myLoc > target && myLoc < best)
                        best = myLoc;
-               
+
                if(ctx.best > target && ctx.best < best)
                        best = ctx.best;
-               
+
                for(int i=0;i<peers.length;i++) {
                        if(!peers[i].isConnected()) {
                                if(logMINOR)
@@ -575,9 +575,9 @@
                                best = loc;
                        }
                }
-               
+
                // Update nearest, htl
-               
+
                if(PeerManager.distance(myLoc, target) < 
PeerManager.distance(nearest, target)) {
                        if(logMINOR)
                                Logger.minor(this, "Updating nearest to 
"+myLoc+" from "+nearest+" for "+target+" and resetting htl from "+htl+" to 
"+Node.MAX_HTL);
@@ -593,7 +593,7 @@
                        if(logMINOR)
                                Logger.minor(this, "Updated htl to "+htl+" - 
myLoc="+myLoc+", target="+target+", nearest="+nearest);
                }
-               
+
                // Complete ?
                if(htl == 0) {
                        if(src != null) {
@@ -609,15 +609,15 @@
                                complete("success", target, best, nearest, id, 
ctx, counter);
                        }
                }
-               
+
                // Otherwise route it
-               
+
                HashSet visited = ctx.visitedPeers;
-               
+
                while(true) {
-                       
+
                        PeerNode pn = node.peers.closerPeer(src, visited, null, 
target, true, false, 965);
-                       
+
                        if(pn == null) {
                                // Can't complete, because some HTL left
                                // Reject: RNF
@@ -633,9 +633,9 @@
                                }
                                return true;
                        }
-                       
+
                        visited.add(pn);
-                       
+
                        Message forwarded =
                                DMT.createFNPProbeRequest(id, target, nearest, 
best, htl, counter++);
                        try {
@@ -646,7 +646,7 @@
                                // Try another one
                        }
                }
-               
+
        }

        private void complete(String msg, double target, double best, double 
nearest, long id, ProbeContext ctx, short counter) {
@@ -663,7 +663,7 @@
                short counter = m.getShort(DMT.COUNTER);
                if(logMINOR)
                        Logger.minor(this, "Probe reply: "+id+ ' ' +target+ ' ' 
+best+ ' ' +nearest);
-       // Just propagate back to source
+               // Just propagate back to source
                ProbeContext ctx;
                synchronized(recentProbeContexts) {
                        ctx = (ProbeContext) recentProbeContexts.get(lid);
@@ -675,7 +675,7 @@
                        while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
                                recentProbeContexts.popValue();
                }
-               
+
                if(ctx.src != null) {
                        Message complete = DMT.createFNPProbeReply(id, target, 
nearest, best, counter++);
                        try {
@@ -690,7 +690,7 @@
                return true;
        }

-    private boolean handleProbeRejected(Message m, PeerNode src) {
+       private boolean handleProbeRejected(Message m, PeerNode src) {
                long id = m.getLong(DMT.UID);
                Long lid = new Long(id);
                double target = m.getDouble(DMT.TARGET_LOCATION);
@@ -701,7 +701,7 @@
                short reason = m.getShort(DMT.REASON);
                if(logMINOR)
                        Logger.minor(this, "Probe rejected: "+id+ ' ' +target+ 
' ' +best+ ' ' +nearest+ ' ' +htl+ ' ' +counter+ ' ' +reason);
-               
+
                ProbeContext ctx;
                synchronized(recentProbeContexts) {
                        ctx = (ProbeContext) recentProbeContexts.get(lid);
@@ -713,9 +713,9 @@
                        while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
                                recentProbeContexts.popValue();
                }
-               
+
                return innerHandleProbeRequest(src, id, lid, target, best, 
nearest, htl, counter, false, false, null);
-    }
+       }

        public void startProbe(double d, ProbeCallback cb) {
                long l = node.random.nextLong();
@@ -726,5 +726,4 @@
                double nodeLoc = node.getLocation();
                innerHandleProbeRequest(null, l, ll, d, (nodeLoc > d) ? nodeLoc 
: 1.0, nodeLoc, Node.MAX_HTL, (short)0, false, false, cb);
        }
-
 }


Reply via email to