Update of /cvsroot/freenet/freenet/src/freenet/node
In directory sc8-pr-cvs1:/tmp/cvs-serv11501/src/freenet/node
Modified Files:
ConnectionOpener.java Main.java Node.java NodeReference.java
Log Message:
6217:
Implement PeerHandler. Web interface works. FCP does not work. Probably has
significant bugs. This is unstable after all... and others may want to look at it.
Index: ConnectionOpener.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/ConnectionOpener.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ConnectionOpener.java 18 Sep 2003 17:48:08 -0000 1.12
+++ ConnectionOpener.java 4 Oct 2003 01:16:56 -0000 1.13
@@ -1,6 +1,7 @@
package freenet.node;
import freenet.ConnectionHandler;
import freenet.Identity;
+import freenet.OpenConnectionManager;
import freenet.node.rt.RoutingTable;
import freenet.node.Node;
import freenet.node.states.maintenance.Checkpoint;
@@ -12,13 +13,14 @@
public class ConnectionOpener implements Checkpointed {
- protected final NodeReference ref;
protected final RoutingTable rt; // to check whether still open
protected final Node node;
protected final Identity id;
+ protected final OpenConnectionManager ocm;
int currentDelay = 0;
int baseBackoffDelay = 0;
int startBackoffDelay=1000;
+ int lastBackoffDelay = 0; // last backoff before recent success
protected boolean isUnscheduled = true;
public static final Hashtable connOpeners = new Hashtable();
protected boolean logDEBUG;
@@ -27,46 +29,51 @@
return super.toString()+": isUnscheduled="+isUnscheduled;
}
- public static void scheduleConnectionOpener(NodeReference ref,
- RoutingTable rt,
- Node n) {
+ /**
+ * Schedule a ConnectionOpener to open a connection to a given
+ * node.
+ * @param aaargh if true, schedule a post-connection backoff,
+ * because something broke immediately after connection.
+ */
+ public static void scheduleConnectionOpener(Identity id,
+ Node n,
+ boolean aaargh) {
boolean logDEBUG =
n.logger.shouldLog(Logger.DEBUG,ConnectionOpener.class);
if(logDEBUG)
n.logger.log(ConnectionOpener.class,
- "Scheduling ConnectionOpener for "+ref,
+ "Scheduling ConnectionOpener for "+id,
Logger.DEBUG);
- Identity i = ref.getIdentity();
- if(!rt.references(i)) {
+ if(!needsOpen(id, n.rt, n.connections)) {
if(logDEBUG)
Core.logger.log(ConnectionOpener.class,
- "rt does not reference "+ref+": "+i,
+ "rt does not reference "+id,
Logger.DEBUG);
return;
}
synchronized(connOpeners) {
- ConnectionOpener co = (ConnectionOpener)(connOpeners.get(i));
+ ConnectionOpener co = (ConnectionOpener)(connOpeners.get(id));
if(co == null) {
- co = new ConnectionOpener(ref, rt, n);
- connOpeners.put(i, co);
+ co = new ConnectionOpener(id, n);
+ connOpeners.put(id, co);
if(logDEBUG)
- Core.logger.log(co, "Created "+co+" for "+ref,
+ Core.logger.log(co, "Created "+co+" for "+id,
Logger.DEBUG);
}
- co.reschedule();
+ co.reschedule(aaargh);
}
}
- public ConnectionOpener(NodeReference ref, RoutingTable rt, Node node) {
- this.ref = ref;
- this.rt = rt;
+ public ConnectionOpener(Identity id, Node node) {
+ this.rt = node.rt;
this.node = node;
- this.id = ref.getIdentity();
+ this.id = id;
+ this.ocm = node.connections;
logDEBUG = node.logger.shouldLog(Logger.DEBUG,this);
}
public String getCheckpointName() {
- return "Connection opener @ "+ref;
+ return "Connection opener @ "+id;
}
public long nextCheckpoint() {
@@ -74,12 +81,10 @@
if(logDEBUG)
node.logger.log(this, "nextCheckpoint() on "+this,
Logger.DEBUG);
- boolean rtReferencesMe = rt.references(id);
- boolean hasFreeConn = node.connections.findFreeConnection(id) != null;
- if((!rtReferencesMe) || hasFreeConn) {
+ boolean rtReferencesMe = needsOpen(id);
+ if(!rtReferencesMe) {
if(logDEBUG)
- node.logger.log(this, "unscheduling "+this+" (open conns: "+
- hasFreeConn+", in RT: "+
+ node.logger.log(this, "unscheduling "+this+" (needsOpen: "+
rtReferencesMe+")", Logger.DEBUG);
isUnscheduled = true;
synchronized(connOpeners) {
@@ -97,10 +102,19 @@
return isUnscheduled;
}
- public synchronized void reschedule() {
+ /**
+ * Reschedule the connection opener
+ * @param aaargh true to reschedule it for twice the last backoff
+ * time before we succeeded, because of a delayed failure
+ */
+ public synchronized void reschedule(boolean aaargh) {
if(logDEBUG)
node.logger.log(this, "Rescheduling "+this,
new Exception("debug"), Logger.DEBUG);
+ if(aaargh)
+ currentDelay = lastBackoffDelay;
+ // Backing off after a delayed failure
+
if(isUnscheduled) {
isUnscheduled = false;
if(logDEBUG)
@@ -116,6 +130,17 @@
node.logger.log(this, "Was already scheduled: "+this,
Logger.DEBUG); }
}
+
+ protected boolean needsOpen(Identity id) {
+ return needsOpen(id, rt, ocm);
+ }
+
+ protected static boolean needsOpen(Identity id, RoutingTable rt,
+ OpenConnectionManager ocm) {
+ if(ocm.needsConnection(id)) return true;
+ // If is in the RT, OCM will .needsConnection
+ return false;
+ }
public void checkpoint() {
logDEBUG = node.logger.shouldLog(Logger.DEBUG,this);
@@ -123,24 +148,33 @@
if(logDEBUG)
node.logger.log(this, "Running checkpoint on "+this,
Logger.DEBUG);
- Identity i = ref.getIdentity();
- if(!rt.references(i)) return;
- if(node.connections.findFreeConnection(id) != null) return;
+ if(!needsOpen(id)) return;
+ NodeReference ref = ocm.getNodeReference(id);
+ if(ref == null)
+ ref = rt.getNodeReference(id);
+ if(ref == null || ref.noPhysical()) {
+ Core.logger.log(this, "No ref or not useful ref: "+id+
+ ": "+ref, Logger.NORMAL);
+ backoff();
+ return;
+ }
long startTime = System.currentTimeMillis();
try {
- if(logMINOR)
- Core.logger.log(this, "Opening connection to "+ref,
- Logger.MINOR);
+ if(logMINOR)
+ Core.logger.log(this, "Opening connection to "+ref,
+ Logger.MINOR);
Core.diagnostics.occurrenceCounting("outboundOpenerConnections",1); //look
this stat ye mighty and despair
ConnectionHandler ch =
- node.makeConnection(ref, -1); // -1 means run blocking
+ ocm.createConnection(node, node.getPeer(ref), -1);
+ // -1 means run blocking
+
long diff = System.currentTimeMillis() - startTime;
if(!ch.isCached()) rt.reportConnectionSuccess(id, diff);
// FIXME: possible race on isCached?
- if(logMINOR)
- Core.logger.log(this, "Opened connection to "+ref,
- Logger.MINOR);
-
+ if(logMINOR)
+ Core.logger.log(this, "Opened connection to "+ref,
+ Logger.MINOR);
+ lastBackoffDelay = currentDelay;
currentDelay = 0;
baseBackoffDelay = 0;
} catch (CommunicationException e) {
@@ -149,18 +183,22 @@
if(startTime > 0)
rt.reportConnectionFailure(id, System.currentTimeMillis() -
startTime);
- if(currentDelay == 0) {
- baseBackoffDelay = startBackoffDelay;
- currentDelay = startBackoffDelay +
- Core.randSource.nextInt(startBackoffDelay);
- } else {
- baseBackoffDelay = baseBackoffDelay << 1;
- currentDelay = (baseBackoffDelay >> 1) +
- Core.randSource.nextInt(baseBackoffDelay >> 1);
- }
- if(logDEBUG)
- Core.logger.log(this, "Rescheduling "+this+" in "+
- currentDelay+"ms", Logger.DEBUG);
+ backoff();
}
+ }
+
+ private void backoff() {
+ if(currentDelay == 0) {
+ baseBackoffDelay = startBackoffDelay;
+ currentDelay = startBackoffDelay +
+ Core.randSource.nextInt(startBackoffDelay);
+ } else {
+ baseBackoffDelay = baseBackoffDelay << 1;
+ currentDelay = (baseBackoffDelay >> 1) +
+ Core.randSource.nextInt(baseBackoffDelay >> 1);
+ }
+ if(logDEBUG)
+ Core.logger.log(this, "Rescheduling "+this+" in "+
+ currentDelay+"ms", Logger.DEBUG);
}
}
Index: Main.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/Main.java,v
retrieving revision 1.263
retrieving revision 1.264
diff -u -r1.263 -r1.264
--- Main.java 3 Oct 2003 08:35:20 -0000 1.263
+++ Main.java 4 Oct 2003 01:16:56 -0000 1.264
@@ -2721,6 +2721,8 @@
node.connections.countLRUConnections()+
"\nOCM.countOpenLRUConnections(): "+
node.connections.countOpenLRUConnections()+
+ "\nOCM.countPeerHandlers(): "+
+ node.connections.countPeerHandlers()+
"\nFnpLinkManager.activeLinks: "+
FNPmgr.countActiveLinks()+
"\nFnpLinkManager.activePeers: "+
Index: Node.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/Node.java,v
retrieving revision 1.213
retrieving revision 1.214
diff -u -r1.213 -r1.214
--- Node.java 3 Oct 2003 08:35:20 -0000 1.213
+++ Node.java 4 Oct 2003 01:16:56 -0000 1.214
@@ -2611,19 +2611,19 @@
* @param nr The node to connect to.
* @param timeout The time to allow in connecting.
*/
- public ConnectionHandler makeConnection(NodeReference nr, long timeout)
- throws CommunicationException {
- Peer p = getPeer(nr);
- if (p == null)
- throw new ConnectFailedException(new VoidAddress(), nr.getIdentity(),
- "Unusable node ref", true);
- return makeConnection(p, timeout);
- }
+// public ConnectionHandler makeConnection(NodeReference nr, long timeout)
+// throws CommunicationException {
+// Peer p = getPeer(nr);
+// if (p == null)
+// throw new ConnectFailedException(new VoidAddress(), nr.getIdentity(),
+// "Unusable node ref", true);
+// return makeConnection(p, timeout);
+// }
- public final ConnectionHandler makeConnection(NodeReference nr)
- throws CommunicationException {
- return makeConnection(nr, 0);
- }
+// public final ConnectionHandler makeConnection(NodeReference nr)
+// throws CommunicationException {
+// return makeConnection(nr, 0);
+// }
/**
@@ -2636,9 +2636,10 @@
* @return The trailing field stream (if there is one).
*/
public final TrailerWriter sendMessage(Message m, NodeReference nr,
- long timeout)
+ long timeout)
throws CommunicationException {
- return makeConnection(nr, timeout).sendMessage(m);
+ return connections.sendMessage(m, nr.getIdentity(), nr, timeout,
+ PeerHandler.NORMAL);
}
public final TrailerWriter sendMessage(Message m, NodeReference nr)
@@ -2646,14 +2647,42 @@
return sendMessage(m, nr, 0);
}
- public final TrailerWriter sendMessage(Message m, Peer p, long timeout)
+ public final TrailerWriter sendMessage(Message m, Peer p, long timeout,
+ int msgPrio)
+ throws CommunicationException {
+ return connections.sendMessage(m, p.getIdentity(), null, timeout, msgPrio);
+ }
+
+ /**
+ * Send a message, asynchronously
+ * @param m the message
+ * @param p the peer
+ * @param msgPrio the priority of the message, see PeerHandler
+ * @param cb callback to be called when the message send has completed
+ * (successfully or not) - null if no notification is needed.
+ */
+ public final void sendMessageAsync(Message m, Peer p, int msgPrio,
+ MessageSendCallback cb)
+ throws CommunicationException {
+ connections.sendMessageAsync(m, p.getIdentity(), null, cb, msgPrio);
+ }
+
+ public final void sendMessageAsync(Message m, Peer p,
+ MessageSendCallback cb)
throws CommunicationException {
- return makeConnection(p, timeout).sendMessage(m);
+ connections.sendMessageAsync(m, p.getIdentity(), null, cb,
+ PeerHandler.NORMAL);
}
- public final TrailerWriter sendMessage(Message m, Peer p)
+ public final void sendMessageAsync(Message m, NodeReference nr,
+ MessageSendCallback cb)
throws CommunicationException {
- return sendMessage(m, p, 0);
+ connections.sendMessageAsync(m, nr.getIdentity(), nr, cb,
+ PeerHandler.NORMAL);
+ }
+
+ public final void unsendMessage(Identity i, MessageSendCallback cb) {
+ connections.unsendMessage(i, cb);
}
public static int perturbHTL(int htl) {
@@ -2731,18 +2760,36 @@
}
}
+ /**
+ * Schedule open of a connection to a given Identity.
+ */
public void scheduleConnectionOpener(Identity id) {
if(id == null) throw new IllegalArgumentException("null identity");
+ ConnectionOpener.scheduleConnectionOpener(id, this, false);
+ }
+
+ public void scheduleConnectionBackoff(Identity id) {
+ if(id == null) throw new IllegalArgumentException("null identity");
NodeReference ref = rt.getNodeReference(id);
- if(ref != null)
- ConnectionOpener.scheduleConnectionOpener(ref, rt, this);
+ ConnectionOpener.scheduleConnectionOpener(id, this, true);
}
+ /**
+ * Add a reference for a node.
+ * @param nr the reference to be added to the routing table
+ * @param k the key this resulted from, which may be ignored by the RT
+ */
public void reference(Key k, NodeReference nr) {
rt.reference(k, nr);
- ConnectionOpener.scheduleConnectionOpener(nr, rt, this);
+ Identity id = nr.getIdentity();
+ connections.addPeer(id, nr);
+ ConnectionOpener.scheduleConnectionOpener(id, this, true);
}
+ /**
+ * Create ConnectionOpeners to open connections to all nodes in the RT,
+ * called on startup. Also creates PeerHandlers for each.
+ */
public void scheduleOpenAllConnections() {
RTDiagSnapshot rs = rt.getSnapshot();
NodeReference[] refs = rs.references();
@@ -2752,10 +2799,23 @@
NodeReference ref = refs[x];
Core.logger.log(this, "Scheduling open on "+ref+" ("+x+" of "+
refs.length, Logger.MINOR);
- ConnectionOpener.scheduleConnectionOpener(ref, rt, this);
+ Identity id = ref.getIdentity();
+ if(connections.getPeerHandler(id) == null) {
+ PeerHandler ph = new PeerHandler(id, ref, this,
+ getMaxPacketLength());
+ connections.addPeerHandler(id, ph, true);
+ }
+ ConnectionOpener.scheduleConnectionOpener(id, this, true);
}
Core.logger.log(this, "Scheduled open on all connections",
Logger.MINOR);
+ }
+
+ public int getMaxPacketLength() {
+ if(obw != null)
+ return obw.maximumPacketLength();
+ else
+ return 1492; // fixme
}
}
Index: NodeReference.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/NodeReference.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- NodeReference.java 1 Jul 2003 18:39:17 -0000 1.23
+++ NodeReference.java 4 Oct 2003 01:16:56 -0000 1.24
@@ -551,13 +551,13 @@
identity.fingerprint(),
ARKcrypt);
}
-
-
+
/**
* Returns true if the other NodeReference is to the same node, but
* with a new ARK revision value.
*/
public final boolean supersedes(NodeReference nr) {
+ if(nr == null) return true;
if(!identity.equals(nr.identity)) return false;
if(ARKrevision < nr.ARKrevision) return false;
if(noPhysical()) return false;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs