Author: toad
Date: 2007-12-01 17:14:09 +0000 (Sat, 01 Dec 2007)
New Revision: 16190
Added:
trunk/freenet/src/freenet/node/AnnounceSender.java
Modified:
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/OpennetManager.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Opennet announcement. Untested.
Added: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java
(rev 0)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java 2007-12-01 17:14:09 UTC
(rev 16190)
@@ -0,0 +1,392 @@
+package freenet.node;
+
+import java.util.HashSet;
+
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
+public class AnnounceSender implements Runnable, ByteCounter {
+
+ // Constants
+ static final int ACCEPTED_TIMEOUT = 5000;
+ static final int ANNOUNCE_TIMEOUT = 240000; // longer than a regular
request as have to transfer noderefs hop by hop etc
+
+ private final PeerNode source;
+ private final long uid;
+ private final OpennetManager om;
+ private final Node node;
+ private Message msg;
+ private byte[] noderefBuf;
+ private int noderefLength;
+ private short htl;
+ private double nearestLoc;
+ private double target;
+ private static boolean logMINOR;
+
+ public AnnounceSender(Message m, long uid, PeerNode source,
OpennetManager om, Node node) {
+ this.source = source;
+ this.uid = uid;
+ this.msg = m;
+ this.om = om;
+ this.node = node;
+ htl = m.getShort(DMT.HTL);
+ target = m.getDouble(DMT.TARGET_LOCATION); // FIXME validate
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ }
+
+ public void run() {
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" announcing "+uid+"
from "+source, t);
+ } finally {
+ source.completedAnnounce(uid);
+ source.node.completed(uid);
+ }
+ }
+
+ private void realRun() {
+ boolean hasForwarded = false;
+ try {
+ source.sendAsync(DMT.createFNPAccepted(uid), null, 0,
null);
+ } catch (NotConnectedException e) {
+ return;
+ }
+ if(source != null) {
+ if(!transferNoderef()) return;
+ }
+
+ double myLoc = node.lm.getLocation();
+ if(Location.distance(target, myLoc) < Location.distance(target,
nearestLoc)) {
+ nearestLoc = myLoc;
+ htl = node.maxHTL();
+ } else {
+ if(source != null)
+ htl = node.decrementHTL(source, htl);
+ }
+
+ // Now route it.
+
+ HashSet nodesRoutedTo = new HashSet();
+ HashSet nodesNotIgnored = new HashSet();
+ while(true) {
+ if(logMINOR) Logger.minor(this, "htl="+htl);
+ if(htl == 0) {
+ // No more nodes.
+ complete();
+ return;
+ }
+
+ // Route it
+ PeerNode next;
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
+
+ if(next == null) {
+ // Backtrack
+ rnf();
+ return;
+ }
+ if(logMINOR) Logger.minor(this, "Routing request to "+next);
+ nodesRoutedTo.add(next);
+
+ if(hasForwarded)
+ htl = node.decrementHTL(source, htl);
+
+ if(!sendTo(next)) continue;
+
+ hasForwarded = true;
+
+ Message msg = null;
+
+ while(true) {
+
+ /**
+ * What are we waiting for?
+ * FNPAccepted - continue
+ * FNPRejectedLoop - go to another node
+ * FNPRejectedOverload - go to another node
+ */
+
+ MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfOpennetDisabled =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+ MessageFilter mfOpennetNoderefRejected =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetNoderefRejected);
+
+ // mfRejectedOverload must be the last thing in the or
+ // So its or pointer remains null
+ // Otherwise we need to recreate it below
+ MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfOpennetDisabled.or(mfOpennetNoderefRejected))));
+
+ try {
+ msg = node.usm.waitFor(mf, this);
+ if(logMINOR) Logger.minor(this, "first part got "+msg);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from "+next+" while
waiting for Accepted on "+uid);
+ break;
+ }
+
+ if(msg == null) {
+ if(logMINOR) Logger.minor(this, "Timeout waiting for
Accepted");
+ // Try next node
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedLoop) {
+ if(logMINOR) Logger.minor(this, "Rejected loop");
+ // Find another node to route to
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ if(logMINOR) Logger.minor(this, "Rejected: overload");
+ // Give up on this one, try another
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+ source.setOpennetDisabled();
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetNoderefRejected) {
+ int reason = msg.getInt(DMT.REJECT_CODE);
+ Logger.normal(this, "Announce rejected by "+source+" :
"+DMT.getOpennetRejectedCode(reason));
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() != DMT.FNPAccepted) {
+ Logger.error(this, "Unrecognized message: "+msg);
+ continue;
+ }
+
+ break;
+ }
+
+ if((msg == null) || (msg.getSpec() != DMT.FNPAccepted)) {
+ // Try another node
+ continue;
+ }
+
+ if(logMINOR) Logger.minor(this, "Got Accepted");
+
+ // Otherwise, must be Accepted
+
+ // So wait...
+
+ while(true) {
+
+ MessageFilter mfAnnounceCompleted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceCompleted);
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRouteNotFound);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfAnnounceReply =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceReply);
+ MessageFilter mfOpennetDisabled =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+ MessageFilter mf =
mfAnnounceCompleted.or(mfRouteNotFound.or(mfRejectedOverload.or(mfAnnounceReply.or(mfOpennetDisabled))));
+
+ try {
+ msg = node.usm.waitFor(mf, this);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from "+next+" while
waiting for announcement");
+ break;
+ }
+
+ if(logMINOR) Logger.minor(this, "second part got "+msg);
+
+ if(msg == null) {
+ // Fatal timeout
+ timedOut();
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetAnnounceCompleted) {
+ complete();
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPRouteNotFound) {
+ // Backtrack within available hops
+ short newHtl = msg.getShort(DMT.HTL);
+ if(newHtl < htl) htl = newHtl;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Give up on this one, try another
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+ source.setOpennetDisabled();
+ msg = null;
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetAnnounceReply) {
+ validateForwardReply(msg, next);
+ continue; // There may be more
+ }
+
+ Logger.error(this, "Unexpected message: "+msg);
+ }
+ }
+ }
+
+ /**
+ * Validate a reply, and relay it back to the source.
+ * @param msg2 The AnnouncementReply message.
+ * @return True unless we lost the connection to our request source.
+ */
+ private boolean validateForwardReply(Message msg, PeerNode source) {
+ long xferUID = msg.getLong(DMT.TRANSFER_UID);
+ int noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+ int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+ byte[] noderefBuf = om.innerWaitForOpennetNoderef(xferUID,
paddedLength, noderefLength, source, false, uid, true, this);
+ if(noderefBuf == null) {
+ return true; // Don't relay
+ }
+ SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0,
noderefLength, source);
+ if(fs == null) {
+ om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID,
this);
+ return true; // Don't relay
+ }
+ // Now relay it
+ try {
+ om.sendAnnouncementReply(uid, source, noderefBuf, this);
+ } catch (NotConnectedException e) {
+ // Hmmm...!
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Send an AnnouncementRequest.
+ * @param next The node to send the announcement to.
+ * @return True if the announcement was successfully sent.
+ */
+ private boolean sendTo(PeerNode next) {
+ try {
+ om.sendAnnouncementRequest(uid, next, noderefBuf, this,
target, htl, nearestLoc);
+ } catch (NotConnectedException e) {
+ if(logMINOR) Logger.minor(this, "Disconnected");
+ return false;
+ }
+ return true;
+ }
+
+ private void timedOut() {
+ Message msg = DMT.createFNPRejectedOverload(uid, false);
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Ok
+ }
+ }
+
+ private void rnf() {
+ Message msg = DMT.createFNPRouteNotFound(uid, htl);
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Ok
+ }
+ }
+
+ private void complete() {
+ Message msg = DMT.createFNPOpennetAnnounceCompleted(uid);
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Oh well.
+ }
+ }
+
+ /**
+ * @return True unless the noderef is bogus.
+ */
+ private boolean transferNoderef() {
+ long xferUID = msg.getLong(DMT.TRANSFER_UID);
+ noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+ int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+ noderefBuf = om.innerWaitForOpennetNoderef(xferUID,
paddedLength, noderefLength, source, false, uid, true, this);
+ if(noderefBuf == null) {
+ return false;
+ }
+ SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0,
noderefLength, source);
+ if(fs == null) {
+ om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID,
this);
+ return false;
+ }
+ // If we want it, add it and send it.
+ try {
+ if(om.addNewOpennetNode(fs)) {
+ sendOurRef();
+ } else {
+ // Okay, just route it.
+ }
+ } catch (FSParseException e) {
+ om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID,
this);
+ return false;
+ } catch (PeerParseException e) {
+ om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID,
this);
+ return false;
+ } catch (ReferenceSignatureVerificationException e) {
+ om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID,
this);
+ return false;
+ }
+ return true;
+ }
+
+ private void sendOurRef() {
+ // FIXME transmit our noderef back to the node
+ // TODO Auto-generated method stub
+
+ }
+
+ private volatile Object totalBytesSync = new Object();
+ private int totalBytesSent;
+
+ public void sentBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesSent += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesSent;
+ }
+ }
+
+ private int totalBytesReceived;
+
+ public void receivedBytes(int x) {
+ synchronized(totalBytesSync) {
+ totalBytesReceived += x;
+ }
+ }
+
+ public int getTotalReceivedBytes() {
+ synchronized(totalBytesSync) {
+ return totalBytesReceived;
+ }
+ }
+
+ public void sentPayload(int x) {
+ // Doesn't count.
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2007-12-01 17:00:49 UTC (rev
16189)
+++ trunk/freenet/src/freenet/node/Node.java 2007-12-01 17:14:09 UTC (rev
16190)
@@ -2224,7 +2224,7 @@
/**
* A request completed (regardless of success).
*/
- private synchronized void completed(long id) {
+ synchronized void completed(long id) {
recentlyCompletedIDs.push(new Long(id));
while(recentlyCompletedIDs.size() > MAX_RECENTLY_COMPLETED_IDS)
recentlyCompletedIDs.pop();
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-12-01 17:00:49 UTC
(rev 16189)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-12-01 17:14:09 UTC
(rev 16190)
@@ -97,6 +97,8 @@
return node.nodeUpdater.uom.handleRequestMain(m,
source);
} else if(spec == DMT.UOMSendingMain) {
return node.nodeUpdater.uom.handleSendingMain(m,
source);
+ } else if(spec == DMT.FNPOpennetAnnounceRequest) {
+ return handleAnnounceRequest(m, source);
}
if(!source.isRoutable()) return false;
@@ -141,7 +143,7 @@
// return handleProbeRejected(m, source);
// } else if(spec == DMT.FNPProbeTrace) {
// return handleProbeTrace(m, source);
- }
+ }
return false;
}
@@ -265,6 +267,49 @@
return true;
}
+ private boolean handleAnnounceRequest(Message m, PeerNode source) {
+ long uid = m.getLong(DMT.UID);
+ OpennetManager om = node.getOpennet();
+ if(om == null) {
+ Message msg = DMT.createFNPOpennetDisabled(uid);
+ try {
+ source.sendAsync(msg, null, 0, null);
+ } catch (NotConnectedException e) {
+ // Ok
+ }
+ return true;
+ }
+ if(node.recentlyCompleted(uid)) {
+ Message msg = DMT.createFNPRejectedLoop(uid);
+ try {
+ source.sendAsync(msg, null, 0, null);
+ } catch (NotConnectedException e) {
+ // Ok
+ }
+ return true;
+ }
+ boolean success = false;
+ try {
+ if(!source.shouldAcceptAnnounce(uid)) {
+ node.completed(uid);
+ Message msg =
DMT.createFNPRejectedOverload(uid, true);
+ try {
+ source.sendAsync(msg, null, 0, null);
+ } catch (NotConnectedException e) {
+ // Ok
+ }
+ return true;
+ }
+ AnnounceSender sender = new AnnounceSender(m, uid,
source, om, node);
+ node.executor.execute(sender, "Announcement sender for
"+uid);
+ success = true;
+ return true;
+ } finally {
+ if(!success)
+ source.completedAnnounce(uid);
+ }
+ }
+
final Hashtable routedContexts = new Hashtable();
static class RoutedContext {
Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java 2007-12-01 17:00:49 UTC
(rev 16189)
+++ trunk/freenet/src/freenet/node/OpennetManager.java 2007-12-01 17:14:09 UTC
(rev 16190)
@@ -518,6 +518,21 @@
innerSendOpennetRef(xferUID, padded, peer);
}
+ public void sendAnnouncementReply(long uid, PeerNode peer, byte[]
noderef, ByteCounter ctr)
+ throws NotConnectedException {
+ byte[] padded = new byte[PADDED_NODEREF_SIZE];
+ if(noderef.length > padded.length) {
+ Logger.error(this, "Noderef too big: "+noderef.length+"
bytes");
+ return;
+ }
+ System.arraycopy(noderef, 0, padded, 0, noderef.length);
+ long xferUID = node.random.nextLong();
+ Message msg = DMT.createFNPOpennetAnnounceReply(uid, xferUID,
noderef.length,
+ padded.length);
+ peer.sendAsync(msg, null, 0, ctr);
+ innerSendOpennetRef(xferUID, padded, peer);
+ }
+
/**
* Wait for an opennet noderef.
* @param isReply If true, wait for an FNPOpennetConnectReply[New], if
false wait for an FNPOpennetConnectDestination[New].
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2007-12-01 17:00:49 UTC
(rev 16189)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2007-12-01 17:14:09 UTC
(rev 16190)
@@ -266,6 +266,7 @@
final WeakReference myRef;
/** The node is being disconnected, but it may take a while. */
private boolean disconnecting;
+
/**
* For FNP link setup:
* The initiator has to ensure that nonces send back by the
@@ -3038,4 +3039,47 @@
synchronized boolean manyPacketsClaimedSentNotReceived() {
return manyPacketsClaimedSentNotReceived;
}
+
+ static final int MAX_SIMULTANEOUS_ANNOUNCEMENTS = 1;
+ static final int MAX_ANNOUNCE_DELAY = 1000;
+ private long timeLastAcceptedAnnouncement;
+ private long[] runningAnnounceUIDs = new long[0];
+
+ public synchronized boolean shouldAcceptAnnounce(long uid) {
+ long now = System.currentTimeMillis();
+ if(runningAnnounceUIDs.length < MAX_SIMULTANEOUS_ANNOUNCEMENTS
&&
+ now - timeLastAcceptedAnnouncement >
MAX_ANNOUNCE_DELAY) {
+ long[] newList = new long[runningAnnounceUIDs.length +
1];
+ if(runningAnnounceUIDs.length > 0)
+ System.arraycopy(runningAnnounceUIDs, 0,
newList, 0, runningAnnounceUIDs.length);
+ newList[runningAnnounceUIDs.length] = uid;
+ timeLastAcceptedAnnouncement = now;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public synchronized boolean completedAnnounce(long uid) {
+ if(runningAnnounceUIDs.length == 0) return false;
+ long[] newList = new long[runningAnnounceUIDs.length - 1];
+ int x = 0;
+ for(int i=0;i<runningAnnounceUIDs.length;i++) {
+ if(i == runningAnnounceUIDs.length) return false;
+ long l = runningAnnounceUIDs[i];
+ if(l == uid) continue;
+ newList[x++] = l;
+ }
+ runningAnnounceUIDs = newList;
+ if(x < runningAnnounceUIDs.length) {
+ newList = new long[x];
+ System.arraycopy(runningAnnounceUIDs, 0, newList, 0, x);
+ runningAnnounceUIDs = newList;
+ }
+ return true;
+ }
+
+ public void setOpennetDisabled() {
+ // FIXME
+ }
}