Update of /cvsroot/freenet/freenet/src/freenet/node/states/FNP
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/node/states/FNP
Modified Files:
Tag: stable
FNPFeedbackToken.java NewDataRequest.java
NewInsertRequest.java NewRequest.java NewVoid.java
Added Files:
Tag: stable
NewIdentify.java
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!
--- NEW FILE: NewIdentify.java ---
package freenet.node.states.FNP;
import freenet.node.*;
import freenet.support.Logger;
import freenet.message.Identify;
import java.io.InputStream;
import java.io.IOException;
/**
* Handles Identify messages, verifies noderef contained and sets ref on PeerHandler
*/
public class NewIdentify extends State {
public NewIdentify(long id) {
super(id);
}
public String getName() {
return "New Identify message";
}
public State receivedMessage(Node n, Identify msg) {
NodeReference origRef = msg.getSource();
if(n.rt.wantUnkeyedReference(origRef) &&
!origRef.isSigned()) {
try {
origRef = msg.getVerifiedSource();
} catch (BadReferenceException e) {
if(n.logger.shouldLog(Logger.MINOR, this))
n.logger.log(this, "Caught "+e+" verifying "+msg, e,
Logger.MINOR);
}
}
n.rt.updateReference(origRef);
n.connections.updateReference(origRef);
return null;
}
public void lost(Node n) {
n.logger.log(this, "Lost a NewIdentify! "+this,
Logger.NORMAL);
}
}
Index: FNPFeedbackToken.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/FNP/FNPFeedbackToken.java,v
retrieving revision 1.9.2.1.2.4
retrieving revision 1.9.2.1.2.5
diff -u -w -r1.9.2.1.2.4 -r1.9.2.1.2.5
--- FNPFeedbackToken.java 15 Aug 2003 03:05:49 -0000 1.9.2.1.2.4
+++ FNPFeedbackToken.java 28 Oct 2003 20:20:42 -0000 1.9.2.1.2.5
@@ -16,50 +16,66 @@
private final long id;
// the node who asked us for this
private final Peer origPeer;
+ // the original hopsToLive
+ private final int origHopsToLive;
- FNPFeedbackToken(long id, Peer origPeer) {
+ FNPFeedbackToken(long id, Peer origPeer, int hopsToLive) {
this.id = id;
this.origPeer = origPeer;
+ this.origHopsToLive = hopsToLive;
}
- public final void queryRejected(Node n, int htl, String reason, FieldSet fs,
- int unreachable, int restarted, int rejected)
+ public final void queryRejected(Node n, int htl, String reason,
+ FieldSet fs, int unreachable,
+ int restarted, int rejected,
+ MessageSendCallback cb)
throws CommunicationException {
- n.sendMessage(new QueryRejected(id, htl, reason, fs), origPeer);
+ Message m = new QueryRejected(id, htl, reason, fs);
+ n.sendMessageAsync(m, origPeer, PeerHandler.NORMAL,
+ // this is mid-request so it is relatively important that
it gets through - QueryRejected's to incoming new requests are different
+ cb);
}
- public final void restarted(Node n, long millis) throws CommunicationException {
- n.sendMessage(new QueryRestarted(id), origPeer);
+ public final void restarted(Node n, long millis, MessageSendCallback cb)
+ throws CommunicationException {
+ Message m = new QueryRestarted(id);
+ // If we can't send it within the request timeout, it's not much use
+ if(cb == null)
+ n.sendMessage(m, origPeer, millis);
+ else
+ n.sendMessageAsync(m, origPeer, millis, cb);
}
public final void dataNotFound(Node n, long timeOfQuery,
MessageSendCallback cb)
throws CommunicationException {
Message m = new DataNotFound(id, timeOfQuery);
- ConnectionHandler ch = n.makeConnection(origPeer, 0);
if(cb == null)
- ch.sendMessage(m);
+ n.sendMessage(m, origPeer, Core.hopTime(origHopsToLive));
else
- ch.sendMessageAsync(m, cb);
+ n.sendMessageAsync(m, origPeer, Core.hopTime(origHopsToLive), cb);
}
- public final OutputStream dataFound(Node n, Storables storables, long ctLength)
+ public final TrailerWriter dataFound(Node n, Storables storables, long ctLength)
throws CommunicationException {
try {
FieldSet fs = new FieldSet();
storables.addTo(fs);
- ConnectionHandler ch = n.makeConnection(origPeer, 0);
- return ch.sendMessage(new DataReply(id, fs, ctLength));
+ return n.sendMessage(new DataReply(id, fs, ctLength),
+ origPeer, Core.hopTime(origHopsToLive));
} catch (UnknownHostException e) {
// this should be impossible
n.logger.log(this, "This should be impossible", e,
n.logger.ERROR);
- throw new SendFailedException(origPeer.getAddress(), e.getMessage());
+ throw new SendFailedException(origPeer.getAddress(),
+ e.getMessage());
}
}
- public final void insertReply(Node n, long millis) throws CommunicationException {
- n.sendMessage(new InsertReply(id), origPeer);
+ public final void insertReply(Node n, long millis)
+ throws CommunicationException {
+ n.sendMessage(new InsertReply(id), origPeer,
+ Core.hopTime(origHopsToLive));
}
public final void storeData(Node n, NodeReference nr, long rate,
@@ -74,7 +90,7 @@
nr = null;
}
}
- if(n.randSource.nextFloat()< n.probIncHopsSinceReset)
+ if(n.getRandSource().nextFloat()< n.probIncHopsSinceReset)
hopsSinceReset++;
// LATER...
// Use n.loadStats.resetProbability() to determine the reset
@@ -82,15 +98,14 @@
if (!n.isTransient() && (nr == null || n.loadStats.shouldReset())) {
nr = n.getNodeReference();
// Send our request rate estimate too.
- rate = (long)n.loadStats.localQueryTraffic();
+ rate = (long)(n.loadStats.localQueryTraffic() + 0.5);
hopsSinceReset = 0;
}
Message m = new StoreData(id, nr, rate, hopsSinceReset);
- ConnectionHandler ch = n.makeConnection(origPeer, 0);
if(cb == null)
- ch.sendMessage(m);
+ n.sendMessage(m, origPeer, Core.hopTime(2*origHopsToLive));
else
- ch.sendMessageAsync(m, cb);
+ n.sendMessageAsync(m, origPeer, Core.hopTime(2*origHopsToLive), cb);
}
}
Index: NewDataRequest.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/FNP/NewDataRequest.java,v
retrieving revision 1.4.6.2
retrieving revision 1.4.6.3
diff -u -w -r1.4.6.2 -r1.4.6.3
--- NewDataRequest.java 15 Aug 2003 03:05:50 -0000 1.4.6.2
+++ NewDataRequest.java 28 Oct 2003 20:20:42 -0000 1.4.6.3
@@ -37,7 +37,8 @@
n.requestDataDistribution.add(drmo.searchKey.getVal());
}
- FeedbackToken ft = new FNPFeedbackToken(id, origRec);
+ FeedbackToken ft = new FNPFeedbackToken(id, origRec,
+ drmo.hopsToLive);
RequestInitiator ri = new RequestInitiator(id,
drmo.getReceivedTime());
Pending p = new DataPending(id, (int) drmo.hopsToLive,
Index: NewInsertRequest.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/FNP/NewInsertRequest.java,v
retrieving revision 1.4.6.2
retrieving revision 1.4.6.3
diff -u -w -r1.4.6.2 -r1.4.6.3
--- NewInsertRequest.java 15 Aug 2003 03:05:50 -0000 1.4.6.2
+++ NewInsertRequest.java 28 Oct 2003 20:20:42 -0000 1.4.6.3
@@ -36,7 +36,8 @@
n.requestInsertDistribution.add(irmo.searchKey.getVal());
}
- FeedbackToken ft = new FNPFeedbackToken(id, origRec);
+ FeedbackToken ft = new FNPFeedbackToken(id, origRec,
+ irmo.hopsToLive);
RequestInitiator ri = new RequestInitiator(id,
irmo.getReceivedTime());
Pending p = new InsertPending(id, (int) irmo.hopsToLive,
Index: NewRequest.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/FNP/NewRequest.java,v
retrieving revision 1.18.2.5.2.5
retrieving revision 1.18.2.5.2.6
diff -u -w -r1.18.2.5.2.5 -r1.18.2.5.2.6
--- NewRequest.java 15 Aug 2003 03:05:50 -0000 1.18.2.5.2.5
+++ NewRequest.java 28 Oct 2003 20:20:42 -0000 1.18.2.5.2.6
@@ -36,7 +36,16 @@
long startTime = System.currentTimeMillis();
boolean shouldLog = Core.logger.shouldLog(Core.logger.DEBUG);
- origRec = n.getPeer(mo.getSource());
+ NodeReference ref = mo.getSource();
+
+ n.connections.updateReference(ref);
+
+ n.rt.updateReference(ref);
+
+ origRec = n.getPeer(ref);
+
+ PeerHandler ph =
+ n.connections.makePeerHandler(origRec.getIdentity(), ref);
long time1 = System.currentTimeMillis();
@@ -58,12 +67,17 @@
n.logger.log(this,
"Rejecting query from host of type "+vers+": "+reason,
Logger.MINOR);
+ if(ph.timeSinceLastMessageSent() > ph.rejectOldVersion(false) &&
+ !n.rejectingConnections()) {
Message m = new QueryRejected(id, mo.hopsToLive,
reason, mo.otherFields);
- ConnectionHandler ch = n.makeConnection(origRec, 0);
- ch.sendMessageAsync(m); // don't care what happens
+ n.sendMessageAsync(m, origRec, PeerHandler.EXPENDABLE,
+ null);
+ } // Slow down old, stupid nodes!
n.loadStats.receivedQuery(false);
throw new RequestAbortException(null);
+ } else {
+ ph.rejectOldVersion(true);
}
long time3 = System.currentTimeMillis();
@@ -82,7 +96,7 @@
long time5 = System.currentTimeMillis();
logTime(4, time5-time4, shouldLog);
- if (Math.exp(HTL_FACTOR*htl*htl) < 1 - n.randSource.nextDouble()){
+ if (Math.exp(HTL_FACTOR*htl*htl) < 1 - n.getRandSource().nextDouble()){
//n.logger.log(this, "Decrementing HTL",
// n.logger.DEBUG);
--htl;
@@ -100,11 +114,13 @@
n.logger.log(this,
"Rejecting query, rate limit exceeded.",
Logger.DEBUGGING);
+ if(!n.rejectingConnections()) {
Message m = new QueryRejected(id, htl, 1, reason,
// ^--- attenuate routing.
mo.otherFields);
- ConnectionHandler ch = n.makeConnection(origRec, 0);
- ch.sendMessageAsync(m); // don't care what happens
+ n.sendMessageAsync(m, origRec, PeerHandler.EXPENDABLE,
+ null);
+ }
n.loadStats.receivedQuery(false);
Core.diagnostics.occurrenceBinomial("inboundAggregateRequests",1, 0);
throw new RequestAbortException(null);
@@ -131,14 +147,14 @@
Core.logger.log(this, "Chain "+Fields.longToHex(id)+" sending Accepted
"+
"at "+(time - mo.stateTime) + " millis after stateTime
("+
(time - mo.getReceivedTime())+")", Logger.DEBUG);
- ConnectionHandler ch = n.makeConnection(origRec, 0);
Message m = new Accepted(id);
if(canAsync) {
RequestSendCallback cb =
new RequestSendCallback("Accepted", n, this);
- ch.sendMessageAsync(m, cb);
+ n.sendMessageAsync(m, origRec, PeerHandler.NORMAL,
+ Core.hopTime(1), cb);
} else
- ch.sendMessage(m);
+ n.sendMessage(m, origRec, Core.hopTime(1));
// discount the seconding time from the routing time measurement
// this needs some cleaning...
long acceptTime = System.currentTimeMillis() - time;
Index: NewVoid.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/FNP/NewVoid.java,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.6.1
diff -u -w -r1.1.1.1 -r1.1.1.1.6.1
--- NewVoid.java 13 Jan 2002 05:24:47 -0000 1.1.1.1
+++ NewVoid.java 28 Oct 2003 20:20:42 -0000 1.1.1.1.6.1
@@ -40,6 +40,4 @@
public void lost(Node n) {
// watch me get all worked up about this...
}
-
-
}
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs