Author: toad
Date: 2007-07-12 19:21:59 +0000 (Thu, 12 Jul 2007)
New Revision: 14047
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeCrypto.java
trunk/freenet/src/freenet/node/OpennetManager.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
Path folding - untested, no LRU, no churn control (at the moment we add them
until we reach 30 peers then stop).
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2007-07-12 17:43:30 UTC (rev
14046)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2007-07-12 19:21:59 UTC (rev
14047)
@@ -757,17 +757,6 @@
return msg;
}
- /** Path folding rejected by request sender */
- public static MessageType FNPOpennetConnectRejected = new
MessageType("FNPOpennetConnectRejected") {{
- addField(UID, Long.class);
- }};
-
- public static Message createFNPOpennetConnectRejected(long uid) {
- Message msg = new Message(FNPOpennetConnectRejected);
- msg.set(UID, uid);
- return msg;
- }
-
// Key offers (ULPRs)
public static MessageType FNPOfferKey = new MessageType("FNPOfferKey")
{{
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2007-07-12
17:43:30 UTC (rev 14046)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2007-07-12
19:21:59 UTC (rev 14047)
@@ -1564,4 +1564,8 @@
public Peer[] getPrimaryIPAddress() {
return crypto.detector.getPrimaryPeers();
}
+
+ public byte[] getCompressedNoderef() {
+ return crypto.myCompressedFullRef();
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2007-07-12 17:43:30 UTC (rev
14046)
+++ trunk/freenet/src/freenet/node/Node.java 2007-07-12 19:21:59 UTC (rev
14047)
@@ -2569,6 +2569,12 @@
public DarknetPeerNode createNewDarknetNode(SimpleFieldSet fs) throws
FSParseException, PeerParseException, ReferenceSignatureVerificationException {
return new DarknetPeerNode(fs, this, darknetCrypto, peers, false,
darknetCrypto.packetMangler);
}
+
+ public boolean addNewOpennetNode(SimpleFieldSet fs) throws
FSParseException, PeerParseException, ReferenceSignatureVerificationException {
+ if(opennet == null) return false;
+ return opennet.addNewOpennetNode(fs);
+ }
+
public byte[] getDarknetIdentity() {
return darknetCrypto.myIdentity;
@@ -2612,4 +2618,8 @@
return opennet.crypto.portNumber;
}
+ OpennetManager getOpennet() {
+ return opennet;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/NodeCrypto.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeCrypto.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/NodeCrypto.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -328,12 +328,8 @@
}
}
- /**
- * The part of our node reference which is exchanged in the connection
setup, compressed.
- * @see exportSetupFieldSet()
- */
- public byte[] myCompressedSetupRef() {
- SimpleFieldSet fs = exportPublicFieldSet(true);
+ private byte[] myCompressedRef(boolean setup) {
+ SimpleFieldSet fs = exportPublicFieldSet(setup);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DeflaterOutputStream gis;
gis = new DeflaterOutputStream(baos);
@@ -360,7 +356,23 @@
System.arraycopy(buf, 0, obuf, 1, buf.length);
return obuf;
}
+
+ /**
+ * The part of our node reference which is exchanged in the connection
setup, compressed.
+ * @see exportSetupFieldSet()
+ */
+ public byte[] myCompressedSetupRef() {
+ return myCompressedRef(true);
+ }
+ /**
+ * Our full node reference, compressed.
+ * @see exportSetupFieldSet()
+ */
+ public byte[] myCompressedFullRef() {
+ return myCompressedRef(false);
+ }
+
void addPrivateFields(SimpleFieldSet fs) {
fs.put("dsaPrivKey", privKey.asFieldSet());
fs.putSingle("ark.privURI",
myARK.getInsertURI().toString(false, false));
@@ -405,5 +417,5 @@
}
return true;
}
-
+
}
Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/OpennetManager.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -11,9 +11,11 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.util.Arrays;
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
@@ -28,6 +30,11 @@
final Node node;
final NodeCrypto crypto;
+
+ // FIXME make this configurable
+ static final int MAX_PEERS = 30;
+ // Chance of resetting path folding (for plausible deniability) is 1 in
this number.
+ static final int RESET_PATH_FOLDING_PROB = 20;
public OpennetManager(Node node, NodeCryptoConfig opennetConfig) throws
NodeInitException {
this.node = node;
@@ -123,4 +130,20 @@
node.peers.removeOpennetPeers();
}
+ public boolean addNewOpennetNode(SimpleFieldSet fs) throws
FSParseException, PeerParseException, ReferenceSignatureVerificationException {
+ OpennetPeerNode pn = new OpennetPeerNode(fs, node, crypto,
node.peers, false, crypto.packetMangler);
+ if(Arrays.equals(pn.getIdentity(), crypto.myIdentity))
+ return false; // Equal to myself
+ if(node.peers.containsPeer(pn))
+ return false;
+ if(!wantPeer()) return false;
+ return node.peers.addPeer(pn); // False = already in peers list
+ }
+
+ public boolean wantPeer() {
+ // FIXME implement LRU !!!
+ if(node.peers.getOpennetPeers().length >= MAX_PEERS) return
false;
+ return true;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2007-07-12
17:43:30 UTC (rev 14046)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2007-07-12
19:21:59 UTC (rev 14047)
@@ -8,6 +8,7 @@
import freenet.io.comm.Peer;
import freenet.io.comm.PeerContext;
import freenet.io.comm.SocketHandler;
+import freenet.support.ShortBuffer;
import freenet.support.WouldBlockException;
/**
@@ -95,4 +96,9 @@
* Get our addresses, as peers.
*/
public Peer[] getPrimaryIPAddress();
+
+ /**
+ * Get our compressed noderef
+ */
+ public byte[] getCompressedNoderef();
}
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -1237,4 +1237,13 @@
}
updatePMUserAlert();
}
+
+ public boolean containsPeer(PeerNode pn) {
+ PeerNode[] peers = pn.isOpennet() ?
((PeerNode[])getOpennetPeers()) : ((PeerNode[])getDarknetPeers());
+
+ for(int i=0;i<peers.length;i++)
+ if(Arrays.equals(pn.getIdentity(),
peers[i].getIdentity())) return true;
+
+ return false;
+ }
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -1611,6 +1611,11 @@
* The identity must not change, or we throw.
*/
private void processNewNoderef(byte[] data, int offset, int length) throws
FSParseException {
+ SimpleFieldSet fs = compressedNoderefToFieldSet(data, offset, length);
+ processNewNoderef(fs, false);
+ }
+
+ static SimpleFieldSet compressedNoderefToFieldSet(byte[] data, int offset,
int length) throws FSParseException {
if(length == 0) throw new FSParseException("Too short");
// Firstly, is it compressed?
if(data[offset] == 1) {
@@ -1639,7 +1644,7 @@
}
}
}
- if(logMINOR) Logger.minor(this, "Reference: "+new String(data, offset,
length)+ '(' +length+ ')');
+ if(logMINOR) Logger.minor(PeerNode.class, "Reference: "+new
String(data, offset, length)+ '(' +length+ ')');
// Now decode it
ByteArrayInputStream bais = new ByteArrayInputStream(data, offset+1,
length-1);
InputStreamReader isr;
@@ -1651,13 +1656,13 @@
BufferedReader br = new BufferedReader(isr);
SimpleFieldSet fs;
try {
- fs = new SimpleFieldSet(br, false, true);
+ return new SimpleFieldSet(br, false, true);
} catch (IOException e) {
- Logger.error(this, "Impossible: e", e);
- return;
+ FSParseException ex = new FSParseException("Impossible: "+e);
+ ex.initCause(e);
+ throw ex;
}
- processNewNoderef(fs, false);
- }
+ }
/**
* Process a new nodereference, as a SimpleFieldSet.
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -6,7 +6,12 @@
import freenet.crypt.DSAPublicKey;
import freenet.io.comm.ByteCounter;
import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.io.xfer.BlockTransmitter;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
@@ -16,6 +21,8 @@
import freenet.keys.NodeSSK;
import freenet.keys.SSKBlock;
import freenet.support.Logger;
+import freenet.support.ShortBuffer;
+import freenet.support.SimpleFieldSet;
/**
* Handle an incoming request. Does not do the actual fetching; that
@@ -96,6 +103,9 @@
node.addTransferringRequestHandler(uid);
if(bt.send())
status = RequestSender.SUCCESS; // for byte logging
+ if(source.isOpennet()) {
+ finishOpennetNoRelay();
+ }
}
return;
}
@@ -171,8 +181,13 @@
Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey());
source.sendSync(pk, this);
}
- } else if(!rs.transferStarted()) {
- Logger.error(this, "Status is SUCCESS but we
never started a transfer on "+uid);
+ } else {
+ if(!rs.transferStarted()) {
+ Logger.error(this, "Status is SUCCESS
but we never started a transfer on "+uid);
+ } else {
+ // Successful CHK transfer, maybe path
fold
+ finishOpennet(rs);
+ }
}
return;
case RequestSender.VERIFY_FAILURE:
@@ -237,6 +252,126 @@
}
}
+ private void finishOpennet(RequestSender rs) {
+ if(!source.isOpennet()) return;
+ byte[] noderef = rs.waitForOpennetNoderef();
+ if(noderef == null) {
+ finishOpennetNoRelay();
+ return;
+ }
+
+ if(node.random.nextInt(OpennetManager.RESET_PATH_FOLDING_PROB)
== 0) {
+ finishOpennetNoRelay();
+ return;
+ }
+
+ finishOpennetRelay(rs, noderef);
+ }
+
+ private void finishOpennetNoRelay() {
+ OpennetManager om = node.getOpennet();
+ if(om != null) {
+ if(om.wantPeer()) {
+ Message msg =
DMT.createFNPOpennetConnectDestination(uid, new
ShortBuffer(om.crypto.myCompressedFullRef()));
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Oh well...
+ return;
+ }
+
+ // Wait for response
+
+ MessageFilter mf =
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
+
+ try {
+ msg = node.usm.waitFor(mf, this);
+ } catch (DisconnectedException e) {
+ return; // Lost connection with request
source
+ }
+
+ if(msg == null) {
+ // Timeout
+ return;
+ }
+
+ byte[] noderef =
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+
+ SimpleFieldSet ref;
+ try {
+ ref =
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
+ } catch (FSParseException e) {
+ Logger.error(this, "Could not parse
opennet noderef for "+this+" from "+source, e);
+ return;
+ }
+
+ try {
+ if(!node.addNewOpennetNode(ref)) {
+ Logger.normal(this, "Asked for
opennet ref but didn't want it for "+this+" :\n"+ref);
+ return;
+ } else {
+ Logger.error(this, "Added
opennet noderef in "+this);
+ }
+ } catch (FSParseException e) {
+ Logger.error(this, "Could not parse
opennet noderef for "+this+" from "+source, e);
+ return;
+ } catch (PeerParseException e) {
+ Logger.error(this, "Could not parse
opennet noderef for "+this+" from "+source, e);
+ return;
+ } catch
(ReferenceSignatureVerificationException e) {
+ Logger.error(this, "Bad signature on
opennet noderef for "+this+" from "+source+" : "+e, e);
+ return;
+ }
+ } else {
+ Message msg =
DMT.createFNPOpennetCompletedAck(uid);
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Oh well...
+ }
+ }
+ }
+ }
+
+ private void finishOpennetRelay(RequestSender rs, byte[] noderef) {
+ // Send it back to the handler, then wait for the ConnectReply
+ PeerNode dataSource = rs.successFrom();
+
+ Message msg = DMT.createFNPOpennetConnectDestination(uid, new
ShortBuffer(noderef));
+
+ try {
+ source.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Lost contact with request source, nothing we can do
+ return;
+ }
+
+ // Now wait for reply from the request source
+
+ MessageFilter mf =
MessageFilter.create().setSource(source).setField(DMT.UID,
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
+
+ try {
+ msg = node.usm.waitFor(mf, this);
+ } catch (DisconnectedException e) {
+ return; // Lost connection with request source
+ }
+
+ if(msg == null) {
+ // Timeout
+ return;
+ }
+
+ noderef =
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+ msg = DMT.createFNPOpennetConnectReply(uid, new
ShortBuffer(noderef));
+
+ try {
+ dataSource.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // How sad
+ return;
+ }
+ }
+
private Message createDataFound(KeyBlock block) {
if(block instanceof CHKBlock)
return DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2007-07-12 17:43:30 UTC
(rev 14046)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2007-07-12 19:21:59 UTC
(rev 14047)
@@ -12,6 +12,9 @@
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.io.comm.RetrievalException;
import freenet.io.xfer.BlockReceiver;
import freenet.io.xfer.PartiallyReceivedBlock;
@@ -25,6 +28,7 @@
import freenet.store.KeyCollisionException;
import freenet.support.Logger;
import freenet.support.ShortBuffer;
+import freenet.support.SimpleFieldSet;
/**
* @author amphibian
@@ -43,6 +47,8 @@
// Constants
static final int ACCEPTED_TIMEOUT = 5000;
static final int FETCH_TIMEOUT = 120000;
+ /** Wait up to this long to get a path folding reply */
+ static final int OPENNET_TIMEOUT = 120000;
/** One in this many successful requests is randomly reinserted.
* This is probably a good idea anyway but with the split store it's
essential. */
static final int RANDOM_REINSERT_INTERVAL = 200;
@@ -78,6 +84,7 @@
static final int GENERATED_REJECTED_OVERLOAD = 7;
static final int INTERNAL_ERROR = 8;
static final int RECENTLY_FAILED = 9;
+ private PeerNode successFrom;
private static boolean logMINOR;
@@ -613,10 +620,133 @@
synchronized(this) {
notifyAll();
+ if(status == SUCCESS)
+ successFrom = next;
}
+
+ if(status == SUCCESS && key instanceof NodeCHK && next != null &&
next.isOpennet()) {
+ finishOpennet(next);
+ }
+
}
- public byte[] getHeaders() {
+ /**
+ * Do path folding, maybe.
+ * Wait for either a CompletedAck or a ConnectDestination.
+ * If the former, exit.
+ * If we want a connection, reply with a ConnectReply, otherwise send a
ConnectRejected and exit.
+ * Add the peer.
+ */
+ private void finishOpennet(PeerNode next) {
+
+ try {
+ MessageFilter mfAck =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetCompletedAck);
+ MessageFilter mfConnect =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectDestination);
+ MessageFilter mf =
mfAck.or(mfConnect).setMatchesDroppedConnection(true).setMatchesRestartedConnections(true);
+
+ Message m;
+ try {
+ m = node.usm.waitFor(mf, this);
+ } catch (DisconnectedException e) {
+ return; // Ok
+ }
+
+ if(m == null) {
+ // Timeout
+ Logger.error(this, "Timed out waiting for opennet
acknowledgement on "+this);
+ return;
+ } else if(m.getSpec() == DMT.FNPOpennetCompletedAck) {
+ if(logMINOR)
+ Logger.minor(this, "Destination does not want to path
fold on "+this);
+ return;
+ } else if(!(m.getSpec() == DMT.FNPOpennetConnectDestination)) {
+ Logger.error(this, "Got a "+m+" expecting opennet completed /
opennet connect destination on "+this);
+ return;
+ }
+
+ // DMT.getSpec() === DMT.FNPOpennetConnectDestination
+
+ byte[] noderef = ((ShortBuffer)
m.getObject(DMT.OPENNET_NODEREF)).getData();
+
+ SimpleFieldSet ref;
+ try {
+ ref = PeerNode.compressedNoderefToFieldSet(noderef, 0,
noderef.length);
+ } catch (FSParseException e) {
+ Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next, e);
+ return;
+ }
+
+ try {
+ if(!node.addNewOpennetNode(ref)) {
+ // If we don't want it let somebody else have it
+ synchronized(this) {
+ opennetNoderef = noderef;
+ }
+ return;
+ } else {
+ Logger.error(this, "Added opennet noderef in
"+this);
+ }
+ } catch (FSParseException e) {
+ Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next, e);
+ return;
+ } catch (PeerParseException e) {
+ Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+next, e);
+ return;
+ } catch (ReferenceSignatureVerificationException e) {
+ Logger.error(this, "Bad signature on opennet noderef
for "+this+" from "+next+" : "+e, e);
+ return;
+ }
+
+ // Send our reference
+
+ Message msg = DMT.createFNPOpennetConnectReply(uid, new
ShortBuffer(next.getOutgoingMangler().getCompressedNoderef()));
+
+ try {
+ next.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e) {
+ // Hmmm... let the LRU deal with it
+ if(logMINOR)
+ Logger.minor(this, "Not connected sending
ConnectReply on "+this+" to "+next);
+ }
+ } finally {
+ synchronized(this) {
+ opennetFinished = true;
+ notifyAll();
+ }
+ }
+ }
+
+ // Opennet stuff
+
+ /** Have we finished all opennet-related activities? */
+ private boolean opennetFinished;
+
+ /** Opennet noderef from next node */
+ private byte[] opennetNoderef;
+
+ public byte[] waitForOpennetNoderef() {
+ synchronized(this) {
+ while(true) {
+ if(opennetFinished) {
+ // Only one RequestHandler may take the noderef
+ byte[] ref = opennetNoderef;
+ opennetNoderef = null;
+ return ref;
+ }
+ try {
+ wait(OPENNET_TIMEOUT);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ public PeerNode successFrom() {
+ return successFrom;
+ }
+
+ public byte[] getHeaders() {
return headers;
}