Author: toad
Date: 2007-10-25 17:20:23 +0000 (Thu, 25 Oct 2007)
New Revision: 15562
Modified:
trunk/freenet/src/freenet/node/OpennetManager.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
More refactoring, probably some fixes, receive the new format noderefs if they
are sent.
Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java 2007-10-25 16:31:34 UTC
(rev 15561)
+++ trunk/freenet/src/freenet/node/OpennetManager.java 2007-10-25 17:20:23 UTC
(rev 15562)
@@ -19,16 +19,19 @@
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.io.xfer.BulkReceiver;
import freenet.io.xfer.BulkTransmitter;
import freenet.io.xfer.PartiallyReceivedBulk;
import freenet.support.LRUQueue;
import freenet.support.Logger;
import freenet.support.ShortBuffer;
import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
import freenet.support.io.ByteArrayRandomAccessThing;
import freenet.support.transport.ip.HostnameSyntaxException;
@@ -81,7 +84,10 @@
static final int MIN_TIME_BETWEEN_OFFERS = 30*1000;
private static boolean logMINOR;
+ /** How big to pad opennet noderefs to? If they are bigger than this
then we won't send them. */
static final int PADDED_NODEREF_SIZE = 3072;
+ /** Allow for future expansion. However at any given time all noderefs
should be PADDED_NODEREF_SIZE */
+ static final int MAX_OPENNET_NODEREF_LENGTH = 32768;
public OpennetManager(Node node, NodeCryptoConfig opennetConfig) throws
NodeInitException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
@@ -501,4 +507,75 @@
}
}
+ /**
+ * Wait for an opennet noderef.
+ * @param isReply If true, wait for an FNPOpennetConnectReply[New], if
false wait for an FNPOpennetConnectDestination[New].
+ * @param uid The UID of the parent request.
+ * @return An opennet noderef.
+ */
+ public byte[] waitForOpennetNoderef(boolean isReply, PeerNode source,
long uid, ByteCounter ctr) {
+ // FIXME remove back compat code
+ MessageFilter mfReply =
+
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).
+ setType(isReply ? DMT.FNPOpennetConnectReply :
DMT.FNPOpennetConnectDestination);
+ MessageFilter mfNewReply =
+
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).
+ setType(isReply ? DMT.FNPOpennetConnectReplyNew :
DMT.FNPOpennetConnectDestinationNew);
+ MessageFilter mf =
+ mfReply.or(mfNewReply);
+ if(!isReply) {
+ // Also waiting for an ack
+ MessageFilter mfAck =
+
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetCompletedAck);
+ mf = mfAck.or(mf);
+ }
+
mf.setMatchesDroppedConnection(true).setMatchesRestartedConnections(true);
+ Message msg;
+
+ try {
+ msg = node.usm.waitFor(mf, ctr);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "No opennet response because node
disconnected on "+this);
+ return null; // Lost connection with request source
+ }
+
+ if(msg == null) {
+ // Timeout
+ Logger.normal(this, "Timeout waiting for opennet peer
on "+this);
+ return null;
+ }
+
+ if(msg.getSpec() == DMT.FNPOpennetCompletedAck)
+ return null; // Acked (only possible if !isReply)
+
+ // FIXME remove back compat
+ if(msg.getSpec() == DMT.FNPOpennetConnectReply || msg.getSpec()
== DMT.FNPOpennetConnectDestination) {
+ return
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+ } else {
+ // New format
+ long xferUID = msg.getLong(DMT.TRANSFER_UID);
+ int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+ int realLength = msg.getInt(DMT.NODEREF_LENGTH);
+ if(paddedLength > OpennetManager.MAX_OPENNET_NODEREF_LENGTH) {
+ Logger.error(this, "Noderef too big:
"+SizeUtil.formatSize(paddedLength)+" real length
"+SizeUtil.formatSize(realLength));
+ return null;
+ }
+ if(realLength > paddedLength) {
+ Logger.error(this, "Real length larger than padded
length: "+SizeUtil.formatSize(paddedLength)+" real length
"+SizeUtil.formatSize(realLength));
+ return null;
+ }
+ byte[] buf = new byte[paddedLength];
+ ByteArrayRandomAccessThing raf = new
ByteArrayRandomAccessThing(buf);
+ PartiallyReceivedBulk prb = new PartiallyReceivedBulk(node.usm,
buf.length, Node.PACKET_SIZE, raf, false);
+ BulkReceiver br = new BulkReceiver(prb, source, xferUID);
+ if(!br.receive()) {
+ Logger.error(this, "Failed to receive noderef bulk
transfer for "+this);
+ return null;
+ }
+ byte[] noderef = new byte[realLength];
+ System.arraycopy(buf, 0, noderef, 0, realLength);
+ return noderef;
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2007-10-25 16:31:34 UTC
(rev 15561)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2007-10-25 17:20:23 UTC
(rev 15562)
@@ -13,7 +13,9 @@
import freenet.io.comm.PeerParseException;
import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.BulkReceiver;
import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.io.xfer.PartiallyReceivedBulk;
import freenet.keys.CHKBlock;
import freenet.keys.Key;
import freenet.keys.KeyBlock;
@@ -23,6 +25,8 @@
import freenet.support.Logger;
import freenet.support.ShortBuffer;
import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
+import freenet.support.io.ByteArrayRandomAccessThing;
/**
* Handle an incoming request. Does not do the actual fetching; that
@@ -320,7 +324,11 @@
finishOpennetRelay(noderef, om);
}
-
+
+ /**
+ * Send our noderef to the request source, wait for a reply, if we get
one add it. Called when either the request
+ * wasn't routed, or the node it was routed to didn't return a noderef.
+ */
private void finishOpennetNoRelayInner(OpennetManager om) {
if(logMINOR)
Logger.minor(this, "Finishing opennet: sending own reference");
@@ -336,24 +344,9 @@
// Wait for response
- MessageFilter mf =
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
- Message msg;
+ byte[] noderef =
+ om.waitForOpennetNoderef(true, source, uid,
this);
- try {
- msg = node.usm.waitFor(mf, this);
- } catch (DisconnectedException e) {
- Logger.normal(this, "No opennet response
because node disconnected on "+this);
- return; // Lost connection with request source
- }
-
- if(msg == null) {
- // Timeout
- Logger.normal(this, "Timeout waiting for
opennet peer on "+this);
- return;
- }
-
- byte[] noderef =
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
-
SimpleFieldSet ref;
try {
ref =
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
@@ -384,7 +377,13 @@
// Oh well...
}
}
-
+
+ /**
+ * Called when the node we routed the request to returned a valid noderef,
and we don't want it.
+ * So we relay it downstream to somebody who does, and wait to relay the
response back upstream.
+ * @param noderef
+ * @param om
+ */
private void finishOpennetRelay(byte[] noderef, OpennetManager om) {
if(logMINOR)
Logger.minor(this, "Finishing opennet: relaying reference from
"+rs.successFrom());
@@ -392,32 +391,22 @@
PeerNode dataSource = rs.successFrom();
try {
- om.sendOpennetRef(false, uid, source,
om.crypto.myCompressedFullRef(), this);
+ om.sendOpennetRef(false, uid, source, noderef, this);
} catch (NotConnectedException e) {
// Lost contact with request source, nothing we can do
return;
}
- // Now wait for reply from the request source
+ // Now wait for reply from the request source.
- MessageFilter mf =
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
-
- Message msg;
- try {
- msg = node.usm.waitFor(mf, this);
- } catch (DisconnectedException e) {
- return; // Lost connection with request source
- }
+ byte[] newNoderef = om.waitForOpennetNoderef(true, source, uid,
this);
- if(msg == null) {
- // Timeout
- return;
- }
+ if(noderef == null) return;
- noderef =
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+ // Send it forward to the data source, if it is valid.
try {
- SimpleFieldSet fs =
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
+ SimpleFieldSet fs =
PeerNode.compressedNoderefToFieldSet(newNoderef, 0, noderef.length);
if(fs.getBoolean("opennet", false)) {
try {
om.sendOpennetRef(true, uid,
dataSource, noderef, this);
@@ -429,13 +418,6 @@
} catch (FSParseException e1) {
// Invalid, clear it
}
-
- try {
- dataSource.sendAsync(msg, null, 0, this);
- } catch (NotConnectedException e) {
- // How sad
- return;
- }
}
private Message createDataFound(KeyBlock block) {
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2007-10-25 16:31:34 UTC
(rev 15561)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2007-10-25 17:20:23 UTC
(rev 15562)
@@ -18,7 +18,9 @@
import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.io.comm.RetrievalException;
import freenet.io.xfer.BlockReceiver;
+import freenet.io.xfer.BulkReceiver;
import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.io.xfer.PartiallyReceivedBulk;
import freenet.keys.CHKBlock;
import freenet.keys.Key;
import freenet.keys.KeyVerifyException;
@@ -30,6 +32,8 @@
import freenet.support.Logger;
import freenet.support.ShortBuffer;
import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
+import freenet.support.io.ByteArrayRandomAccessThing;
/**
* @author amphibian
@@ -652,6 +656,8 @@
} catch (DisconnectedException e) {
// Fine by me.
}
+
+ // FIXME support new format path folding
}
/**
@@ -663,60 +669,39 @@
*/
private void finishOpennet(PeerNode next) {
- try {
- MessageFilter mfAck =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetCompletedAck);
- MessageFilter mfConnect =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectDestination);
- MessageFilter mf =
mfAck.or(mfConnect).setMatchesDroppedConnection(true).setMatchesRestartedConnections(true);
+ OpennetManager om;
- Message m;
- try {
- m = node.usm.waitFor(mf, this);
- } catch (DisconnectedException e) {
- return; // Ok
- }
-
- if(m == null) {
- // Timeout
- Logger.error(this, "Timed out waiting for opennet
acknowledgement on "+this+" from "+next);
- return;
- } else if(m.getSpec() == DMT.FNPOpennetCompletedAck) {
- if(logMINOR)
- Logger.minor(this, "Destination does not want to path
fold on "+this+" from "+next);
- return;
- } else if(!(m.getSpec() == DMT.FNPOpennetConnectDestination)) {
- Logger.error(this, "Got a "+m+" expecting opennet completed /
opennet connect destination on "+this+" from "+next);
- return;
- }
-
- // DMT.getSpec() === DMT.FNPOpennetConnectDestination
-
- byte[] noderef = ((ShortBuffer)
m.getObject(DMT.OPENNET_NODEREF)).getData();
-
- SimpleFieldSet ref;
- try {
- ref = PeerNode.compressedNoderefToFieldSet(noderef, 0,
noderef.length);
- } catch (FSParseException e) {
- Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next, e);
- return;
- }
-
- if(!ref.getBoolean("opennet", false)) {
- Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next);
- return;
- }
-
- OpennetManager om = node.getOpennet();
try {
- if(om == null ||
- (om != null /* prevent race */ &&
!node.addNewOpennetNode(ref))) {
+ om = node.getOpennet();
+
+ if(om == null) return; // Nothing to do
+
+ byte[] noderef = om.waitForOpennetNoderef(false, next, uid,
this);
+
+ if(noderef == null) return;
+
+ SimpleFieldSet ref =
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
+
+ if(!ref.getBoolean("opennet", false)) {
+ Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next);
+ return;
+ }
+
+ if(!node.addNewOpennetNode(ref)) {
// If we don't want it let somebody else have it
synchronized(this) {
opennetNoderef = noderef;
+ // RequestHandler will send a noderef
back up, eventually
}
return;
} else {
+ // opennetNoderef = null i.e. we want the
noderef so we won't pass it further down.
Logger.error(this, "Added opennet noderef in
"+this+" from "+next);
}
+
+ // We want the node: send our reference
+ om.sendOpennetRef(true, uid, next,
om.crypto.myCompressedFullRef(), this);
+
} catch (FSParseException e) {
Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next, e);
return;
@@ -726,17 +711,10 @@
} catch (ReferenceSignatureVerificationException e) {
Logger.error(this, "Bad signature on opennet noderef
for "+this+" from "+next+" : "+e, e);
return;
- }
-
- // Send our reference
-
- try {
- om.sendOpennetRef(true, uid, source,
om.crypto.myCompressedFullRef(), this);
} catch (NotConnectedException e) {
// Hmmm... let the LRU deal with it
if(logMINOR)
Logger.minor(this, "Not connected sending
ConnectReply on "+this+" to "+next);
- }
} finally {
synchronized(this) {
opennetFinished = true;