Update of /cvsroot/freenet/freenet/src/freenet/node
In directory sc8-pr-cvs1:/tmp/cvs-serv11501/src/freenet/node

Modified Files:
        ConnectionOpener.java Main.java Node.java NodeReference.java 
Log Message:
6217:
Implement PeerHandler. Web interface works. FCP does not work. Probably has 
significant bugs. This is unstable after all... and others may want to look at it.


Index: ConnectionOpener.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/ConnectionOpener.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ConnectionOpener.java       18 Sep 2003 17:48:08 -0000      1.12
+++ ConnectionOpener.java       4 Oct 2003 01:16:56 -0000       1.13
@@ -1,6 +1,7 @@
 package freenet.node;
 import freenet.ConnectionHandler;
 import freenet.Identity;
+import freenet.OpenConnectionManager;
 import freenet.node.rt.RoutingTable;
 import freenet.node.Node;
 import freenet.node.states.maintenance.Checkpoint;
@@ -12,13 +13,14 @@
 
 public class ConnectionOpener implements Checkpointed {
     
-    protected final NodeReference ref;
     protected final RoutingTable rt; // to check whether still open
     protected final Node node;
     protected final Identity id;
+    protected final OpenConnectionManager ocm;
     int currentDelay = 0;
     int baseBackoffDelay = 0;
     int startBackoffDelay=1000;
+    int lastBackoffDelay = 0; // last backoff before recent success
     protected boolean isUnscheduled = true;
     public static final Hashtable connOpeners = new Hashtable();
     protected boolean logDEBUG;
@@ -27,46 +29,51 @@
        return super.toString()+": isUnscheduled="+isUnscheduled;
     }
     
-    public static void scheduleConnectionOpener(NodeReference ref, 
-                                               RoutingTable rt, 
-                                               Node n) {
+    /**
+     * Schedule a ConnectionOpener to open a connection to a given
+     * node.
+     * @param aaargh if true, schedule a post-connection backoff,
+     * because something broke immediately after connection.
+     */
+    public static void scheduleConnectionOpener(Identity id,
+                                               Node n, 
+                                               boolean aaargh) {
        boolean logDEBUG = 
            n.logger.shouldLog(Logger.DEBUG,ConnectionOpener.class);
        if(logDEBUG)
            n.logger.log(ConnectionOpener.class, 
-                        "Scheduling ConnectionOpener for "+ref,
+                        "Scheduling ConnectionOpener for "+id,
                         Logger.DEBUG);
-       Identity i = ref.getIdentity();
-       if(!rt.references(i)) {
+       if(!needsOpen(id, n.rt, n.connections)) {
            if(logDEBUG)
                Core.logger.log(ConnectionOpener.class, 
-                               "rt does not reference "+ref+": "+i,
+                               "rt does not reference "+id,
                                Logger.DEBUG);
            return;
        }
        synchronized(connOpeners) {
-           ConnectionOpener co = (ConnectionOpener)(connOpeners.get(i));
+           ConnectionOpener co = (ConnectionOpener)(connOpeners.get(id));
            if(co == null) {
-               co = new ConnectionOpener(ref, rt, n);
-               connOpeners.put(i, co);
+               co = new ConnectionOpener(id, n);
+               connOpeners.put(id, co);
                if(logDEBUG)
-                   Core.logger.log(co, "Created "+co+" for "+ref,
+                   Core.logger.log(co, "Created "+co+" for "+id,
                                    Logger.DEBUG);
            }
-           co.reschedule();
+           co.reschedule(aaargh);
        }
     }
     
-    public ConnectionOpener(NodeReference ref, RoutingTable rt, Node node) {
-       this.ref = ref;
-       this.rt = rt;
+    public ConnectionOpener(Identity id, Node node) {
+       this.rt = node.rt;
        this.node = node;
-       this.id = ref.getIdentity();
+       this.id = id;
+       this.ocm = node.connections;
        logDEBUG = node.logger.shouldLog(Logger.DEBUG,this);
     }
     
     public String getCheckpointName() {
-        return "Connection opener @ "+ref;
+        return "Connection opener @ "+id;
     }
     
     public long nextCheckpoint() {
@@ -74,12 +81,10 @@
        if(logDEBUG)
            node.logger.log(this, "nextCheckpoint() on "+this,
                            Logger.DEBUG);
-       boolean rtReferencesMe = rt.references(id);
-       boolean hasFreeConn = node.connections.findFreeConnection(id) != null;
-       if((!rtReferencesMe) || hasFreeConn) {
+       boolean rtReferencesMe = needsOpen(id);
+       if(!rtReferencesMe) {
            if(logDEBUG)
-               node.logger.log(this, "unscheduling "+this+" (open conns: "+
-                               hasFreeConn+", in RT: "+
+               node.logger.log(this, "unscheduling "+this+" (needsOpen: "+
                                rtReferencesMe+")", Logger.DEBUG);
            isUnscheduled = true;
            synchronized(connOpeners) {
@@ -97,10 +102,19 @@
        return isUnscheduled;
     }
     
-    public synchronized void reschedule() {
+    /**
+     * Reschedule the connection opener
+     * @param aaargh true to reschedule it for twice the last backoff
+     * time before we succeeded, because of a delayed failure
+     */
+    public synchronized void reschedule(boolean aaargh) {
        if(logDEBUG)
            node.logger.log(this, "Rescheduling "+this,
                            new Exception("debug"), Logger.DEBUG);
+       if(aaargh)
+           currentDelay = lastBackoffDelay;
+       // Backing off after a delayed failure
+       
        if(isUnscheduled) {
            isUnscheduled = false;
            if(logDEBUG)
@@ -116,6 +130,17 @@
                node.logger.log(this, "Was already scheduled: "+this, 
                                Logger.DEBUG); }
     }
+
+    protected boolean needsOpen(Identity id) {
+       return needsOpen(id, rt, ocm);
+    }
+    
+    protected static boolean needsOpen(Identity id, RoutingTable rt,
+                                      OpenConnectionManager ocm) {
+       if(ocm.needsConnection(id)) return true;
+       // If is in the RT, OCM will .needsConnection
+       return false;
+    }
     
     public void checkpoint() {
        logDEBUG = node.logger.shouldLog(Logger.DEBUG,this);
@@ -123,24 +148,33 @@
        if(logDEBUG)
            node.logger.log(this, "Running checkpoint on "+this,
                            Logger.DEBUG);
-       Identity i = ref.getIdentity();
-       if(!rt.references(i)) return;
-       if(node.connections.findFreeConnection(id) != null) return;
+       if(!needsOpen(id)) return;
+       NodeReference ref = ocm.getNodeReference(id);
+       if(ref == null)
+           ref = rt.getNodeReference(id);
+       if(ref == null || ref.noPhysical()) {
+           Core.logger.log(this, "No ref or not useful ref: "+id+
+                           ": "+ref, Logger.NORMAL);
+           backoff();
+           return;
+       }
        long startTime = System.currentTimeMillis();
        try {
-               if(logMINOR)
-           Core.logger.log(this, "Opening connection to "+ref,
-                           Logger.MINOR);
+           if(logMINOR)
+               Core.logger.log(this, "Opening connection to "+ref,
+                               Logger.MINOR);
            Core.diagnostics.occurrenceCounting("outboundOpenerConnections",1); //look 
this stat ye mighty and despair
            ConnectionHandler ch = 
-               node.makeConnection(ref, -1); // -1 means run blocking
+               ocm.createConnection(node, node.getPeer(ref), -1);
+           // -1 means run blocking
+           
            long diff = System.currentTimeMillis() - startTime;
            if(!ch.isCached()) rt.reportConnectionSuccess(id, diff);
            // FIXME: possible race on isCached?
-               if(logMINOR)
-           Core.logger.log(this, "Opened connection to "+ref,
-                           Logger.MINOR);
-           
+           if(logMINOR)
+               Core.logger.log(this, "Opened connection to "+ref,
+                               Logger.MINOR);
+           lastBackoffDelay = currentDelay;
            currentDelay = 0;
            baseBackoffDelay = 0;
        } catch (CommunicationException e) {
@@ -149,18 +183,22 @@
            if(startTime > 0)
                rt.reportConnectionFailure(id, System.currentTimeMillis() -
                                           startTime);
-           if(currentDelay == 0) {
-               baseBackoffDelay = startBackoffDelay;
-               currentDelay = startBackoffDelay +
-                   Core.randSource.nextInt(startBackoffDelay);
-           } else {
-               baseBackoffDelay = baseBackoffDelay << 1;
-               currentDelay = (baseBackoffDelay >> 1) +
-                   Core.randSource.nextInt(baseBackoffDelay >> 1);
-           }
-               if(logDEBUG)
-               Core.logger.log(this, "Rescheduling "+this+" in "+
-                               currentDelay+"ms", Logger.DEBUG);
+           backoff();
        }
+    }
+    
+    private void backoff() {
+       if(currentDelay == 0) {
+           baseBackoffDelay = startBackoffDelay;
+           currentDelay = startBackoffDelay +
+               Core.randSource.nextInt(startBackoffDelay);
+       } else {
+           baseBackoffDelay = baseBackoffDelay << 1;
+           currentDelay = (baseBackoffDelay >> 1) +
+               Core.randSource.nextInt(baseBackoffDelay >> 1);
+       }
+       if(logDEBUG)
+           Core.logger.log(this, "Rescheduling "+this+" in "+
+                           currentDelay+"ms", Logger.DEBUG);
     }
 }

Index: Main.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/Main.java,v
retrieving revision 1.263
retrieving revision 1.264
diff -u -r1.263 -r1.264
--- Main.java   3 Oct 2003 08:35:20 -0000       1.263
+++ Main.java   4 Oct 2003 01:16:56 -0000       1.264
@@ -2721,6 +2721,8 @@
                                node.connections.countLRUConnections()+
                                "\nOCM.countOpenLRUConnections(): "+
                                node.connections.countOpenLRUConnections()+
+                               "\nOCM.countPeerHandlers(): "+
+                               node.connections.countPeerHandlers()+
                                "\nFnpLinkManager.activeLinks: "+
                                FNPmgr.countActiveLinks()+
                                "\nFnpLinkManager.activePeers: "+

Index: Node.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/Node.java,v
retrieving revision 1.213
retrieving revision 1.214
diff -u -r1.213 -r1.214
--- Node.java   3 Oct 2003 08:35:20 -0000       1.213
+++ Node.java   4 Oct 2003 01:16:56 -0000       1.214
@@ -2611,19 +2611,19 @@
      * @param nr        The node to connect to.
      * @param timeout   The time to allow in connecting.
      */
-    public ConnectionHandler makeConnection(NodeReference nr, long timeout) 
-                                            throws CommunicationException {
-        Peer p = getPeer(nr);
-        if (p == null)
-            throw new ConnectFailedException(new VoidAddress(), nr.getIdentity(),
-                                             "Unusable node ref", true);
-        return makeConnection(p, timeout);
-    }
+//     public ConnectionHandler makeConnection(NodeReference nr, long timeout) 
+//                                             throws CommunicationException {
+//         Peer p = getPeer(nr);
+//         if (p == null)
+//             throw new ConnectFailedException(new VoidAddress(), nr.getIdentity(),
+//                                              "Unusable node ref", true);
+//         return makeConnection(p, timeout);
+//    }
 
-    public final ConnectionHandler makeConnection(NodeReference nr)
-                                            throws CommunicationException {
-        return makeConnection(nr, 0);
-    }
+//     public final ConnectionHandler makeConnection(NodeReference nr)
+//                                             throws CommunicationException {
+//         return makeConnection(nr, 0);
+//     }
 
 
     /**
@@ -2636,9 +2636,10 @@
      * @return  The trailing field stream (if there is one).
      */
     public final TrailerWriter sendMessage(Message m, NodeReference nr, 
-                                         long timeout) 
+                                          long timeout)
        throws CommunicationException {
-        return makeConnection(nr, timeout).sendMessage(m);
+       return connections.sendMessage(m, nr.getIdentity(), nr, timeout,
+                                      PeerHandler.NORMAL);
     }
     
     public final TrailerWriter sendMessage(Message m, NodeReference nr)
@@ -2646,14 +2647,42 @@
         return sendMessage(m, nr, 0);
     }
     
-    public final TrailerWriter sendMessage(Message m, Peer p, long timeout) 
+    public final TrailerWriter sendMessage(Message m, Peer p, long timeout,
+                                          int msgPrio) 
+       throws CommunicationException {
+       return connections.sendMessage(m, p.getIdentity(), null, timeout, msgPrio);
+    }
+    
+    /**
+     * Send a message, asynchronously
+     * @param m the message
+     * @param p the peer
+     * @param msgPrio the priority of the message, see PeerHandler
+     * @param cb callback to be called when the message send has completed
+     * (successfully or not) - null if no notification is needed.
+     */
+    public final void sendMessageAsync(Message m, Peer p, int msgPrio,
+                                      MessageSendCallback cb) 
+       throws CommunicationException {
+       connections.sendMessageAsync(m, p.getIdentity(), null, cb, msgPrio);
+    }
+    
+    public final void sendMessageAsync(Message m, Peer p,
+                                      MessageSendCallback cb) 
        throws CommunicationException {
-        return makeConnection(p, timeout).sendMessage(m);
+       connections.sendMessageAsync(m, p.getIdentity(), null, cb, 
+                                    PeerHandler.NORMAL);
     }
     
-    public final TrailerWriter sendMessage(Message m, Peer p)
+    public final void sendMessageAsync(Message m, NodeReference nr,
+                                      MessageSendCallback cb)
        throws CommunicationException {
-        return sendMessage(m, p, 0);
+       connections.sendMessageAsync(m, nr.getIdentity(), nr, cb,
+                                    PeerHandler.NORMAL);
+    }
+    
+    public final void unsendMessage(Identity i, MessageSendCallback cb) {
+       connections.unsendMessage(i, cb);
     }
     
     public static int perturbHTL(int htl) {
@@ -2731,18 +2760,36 @@
        }
     }
     
+    /**
+     * Schedule open of a connection to a given Identity.
+     */
     public void scheduleConnectionOpener(Identity id) {
        if(id == null) throw new IllegalArgumentException("null identity");
+       ConnectionOpener.scheduleConnectionOpener(id, this, false);
+    }
+    
+    public void scheduleConnectionBackoff(Identity id) {
+       if(id == null) throw new IllegalArgumentException("null identity");
        NodeReference ref = rt.getNodeReference(id);
-       if(ref != null) 
-           ConnectionOpener.scheduleConnectionOpener(ref, rt, this);
+       ConnectionOpener.scheduleConnectionOpener(id, this, true);
     }
     
+    /**
+     * Add a reference for a node.
+     * @param nr the reference to be added to the routing table
+     * @param k the key this resulted from, which may be ignored by the RT
+     */
     public void reference(Key k, NodeReference nr) {
        rt.reference(k, nr);
-       ConnectionOpener.scheduleConnectionOpener(nr, rt, this);
+       Identity id = nr.getIdentity();
+       connections.addPeer(id, nr);
+       ConnectionOpener.scheduleConnectionOpener(id, this, true);
     }
     
+    /**
+     * Create ConnectionOpeners to open connections to all nodes in the RT,
+     * called on startup. Also creates PeerHandlers for each.
+     */
     public void scheduleOpenAllConnections() {
        RTDiagSnapshot rs = rt.getSnapshot();
        NodeReference[] refs = rs.references();
@@ -2752,10 +2799,23 @@
            NodeReference ref = refs[x];
            Core.logger.log(this, "Scheduling open on "+ref+" ("+x+" of "+
                            refs.length, Logger.MINOR);
-           ConnectionOpener.scheduleConnectionOpener(ref, rt, this);
+           Identity id = ref.getIdentity();
+           if(connections.getPeerHandler(id) == null) {
+               PeerHandler ph = new PeerHandler(id, ref, this,
+                                                getMaxPacketLength());
+               connections.addPeerHandler(id, ph, true);
+           }
+           ConnectionOpener.scheduleConnectionOpener(id, this, true);
        }
        Core.logger.log(this, "Scheduled open on all connections",
                        Logger.MINOR);
+    }
+    
+    public int getMaxPacketLength() {
+       if(obw != null)
+           return obw.maximumPacketLength();
+       else
+           return 1492; // fixme
     }
 }
 

Index: NodeReference.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/NodeReference.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- NodeReference.java  1 Jul 2003 18:39:17 -0000       1.23
+++ NodeReference.java  4 Oct 2003 01:16:56 -0000       1.24
@@ -551,13 +551,13 @@
                            identity.fingerprint(), 
                            ARKcrypt);
     }
-
-
+    
     /**
      * Returns true if the other NodeReference is to the same node, but
      * with a new ARK revision value.
      */
     public final boolean supersedes(NodeReference nr) {
+       if(nr == null) return true;
        if(!identity.equals(nr.identity)) return false;
        if(ARKrevision < nr.ARKrevision) return false;
        if(noPhysical()) return false;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to