Author: toad
Date: 2007-10-25 17:20:23 +0000 (Thu, 25 Oct 2007)
New Revision: 15562

Modified:
   trunk/freenet/src/freenet/node/OpennetManager.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
More refactoring, probably some fixes, receive the new format noderefs if they 
are sent.

Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java  2007-10-25 16:31:34 UTC 
(rev 15561)
+++ trunk/freenet/src/freenet/node/OpennetManager.java  2007-10-25 17:20:23 UTC 
(rev 15562)
@@ -19,16 +19,19 @@
 import freenet.io.comm.DMT;
 import freenet.io.comm.DisconnectedException;
 import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.io.xfer.BulkReceiver;
 import freenet.io.xfer.BulkTransmitter;
 import freenet.io.xfer.PartiallyReceivedBulk;
 import freenet.support.LRUQueue;
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
 import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
 import freenet.support.io.ByteArrayRandomAccessThing;
 import freenet.support.transport.ip.HostnameSyntaxException;

@@ -81,7 +84,10 @@
        static final int MIN_TIME_BETWEEN_OFFERS = 30*1000;
        private static boolean logMINOR;

+       /** How big to pad opennet noderefs to? If they are bigger than this 
then we won't send them. */
        static final int PADDED_NODEREF_SIZE = 3072;
+       /** Allow for future expansion. However at any given time all noderefs 
should be PADDED_NODEREF_SIZE */
+       static final int MAX_OPENNET_NODEREF_LENGTH = 32768;

        public OpennetManager(Node node, NodeCryptoConfig opennetConfig) throws 
NodeInitException {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
@@ -501,4 +507,75 @@
                }
        }

+       /**
+        * Wait for an opennet noderef.
+        * @param isReply If true, wait for an FNPOpennetConnectReply[New], if 
false wait for an FNPOpennetConnectDestination[New].
+        * @param uid The UID of the parent request.
+        * @return An opennet noderef.
+        */
+       public byte[] waitForOpennetNoderef(boolean isReply, PeerNode source, 
long uid, ByteCounter ctr) {
+               // FIXME remove back compat code
+               MessageFilter mfReply = 
+                       
MessageFilter.create().setSource(source).setField(DMT.UID, 
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).
+                       setType(isReply ? DMT.FNPOpennetConnectReply : 
DMT.FNPOpennetConnectDestination);
+               MessageFilter mfNewReply =
+                       
MessageFilter.create().setSource(source).setField(DMT.UID, 
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).
+                       setType(isReply ? DMT.FNPOpennetConnectReplyNew : 
DMT.FNPOpennetConnectDestinationNew);
+               MessageFilter mf =
+                       mfReply.or(mfNewReply);
+               if(!isReply) {
+                       // Also waiting for an ack
+                       MessageFilter mfAck = 
+                               
MessageFilter.create().setSource(source).setField(DMT.UID, 
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetCompletedAck);
+                       mf = mfAck.or(mf);
+               }
+               
mf.setMatchesDroppedConnection(true).setMatchesRestartedConnections(true);
+               Message msg;
+               
+               try {
+                       msg = node.usm.waitFor(mf, ctr);
+               } catch (DisconnectedException e) {
+                       Logger.normal(this, "No opennet response because node 
disconnected on "+this);
+                       return null; // Lost connection with request source
+               }
+               
+               if(msg == null) {
+                       // Timeout
+                       Logger.normal(this, "Timeout waiting for opennet peer 
on "+this);
+                       return null;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetCompletedAck)
+                       return null; // Acked (only possible if !isReply)
+               
+               // FIXME remove back compat
+               if(msg.getSpec() == DMT.FNPOpennetConnectReply || msg.getSpec() 
== DMT.FNPOpennetConnectDestination) {
+                       return 
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+               } else {
+                       // New format
+               long xferUID = msg.getLong(DMT.TRANSFER_UID);
+               int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+               int realLength = msg.getInt(DMT.NODEREF_LENGTH);
+               if(paddedLength > OpennetManager.MAX_OPENNET_NODEREF_LENGTH) {
+                       Logger.error(this, "Noderef too big: 
"+SizeUtil.formatSize(paddedLength)+" real length 
"+SizeUtil.formatSize(realLength));
+                       return null;
+               }
+               if(realLength > paddedLength) {
+                       Logger.error(this, "Real length larger than padded 
length: "+SizeUtil.formatSize(paddedLength)+" real length 
"+SizeUtil.formatSize(realLength));
+                       return null;
+               }
+               byte[] buf = new byte[paddedLength];
+               ByteArrayRandomAccessThing raf = new 
ByteArrayRandomAccessThing(buf);
+               PartiallyReceivedBulk prb = new PartiallyReceivedBulk(node.usm, 
buf.length, Node.PACKET_SIZE, raf, false);
+               BulkReceiver br = new BulkReceiver(prb, source, xferUID);
+               if(!br.receive()) {
+                       Logger.error(this, "Failed to receive noderef bulk 
transfer for "+this);
+                               return null;
+               }                       
+               byte[] noderef = new byte[realLength];
+               System.arraycopy(buf, 0, noderef, 0, realLength);
+               return noderef;
+               }
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2007-10-25 16:31:34 UTC 
(rev 15561)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2007-10-25 17:20:23 UTC 
(rev 15562)
@@ -13,7 +13,9 @@
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.ReferenceSignatureVerificationException;
 import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.BulkReceiver;
 import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.io.xfer.PartiallyReceivedBulk;
 import freenet.keys.CHKBlock;
 import freenet.keys.Key;
 import freenet.keys.KeyBlock;
@@ -23,6 +25,8 @@
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
 import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
+import freenet.support.io.ByteArrayRandomAccessThing;

 /**
  * Handle an incoming request. Does not do the actual fetching; that
@@ -320,7 +324,11 @@

        finishOpennetRelay(noderef, om);
     }
-    
+
+       /**
+        * Send our noderef to the request source, wait for a reply, if we get 
one add it. Called when either the request
+        * wasn't routed, or the node it was routed to didn't return a noderef.
+        */
     private void finishOpennetNoRelayInner(OpennetManager om) {
        if(logMINOR)
                Logger.minor(this, "Finishing opennet: sending own reference");
@@ -336,24 +344,9 @@

                        // Wait for response

-                       MessageFilter mf = 
MessageFilter.create().setSource(source).setField(DMT.UID, 
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
-                       Message msg;
+                       byte[] noderef = 
+                               om.waitForOpennetNoderef(true, source, uid, 
this);

-                       try {
-                               msg = node.usm.waitFor(mf, this);
-                       } catch (DisconnectedException e) {
-                               Logger.normal(this, "No opennet response 
because node disconnected on "+this);
-                               return; // Lost connection with request source
-                       }
-                       
-                       if(msg == null) {
-                               // Timeout
-                               Logger.normal(this, "Timeout waiting for 
opennet peer on "+this);
-                               return;
-                       }
-                       
-                       byte[] noderef = 
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
-                       
                        SimpleFieldSet ref;
                        try {
                                ref = 
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
@@ -384,7 +377,13 @@
                        // Oh well...
                }
     }
-    
+
+    /**
+     * Called when the node we routed the request to returned a valid noderef, 
and we don't want it.
+     * So we relay it downstream to somebody who does, and wait to relay the 
response back upstream.
+     * @param noderef
+     * @param om
+     */
        private void finishOpennetRelay(byte[] noderef, OpennetManager om) {
        if(logMINOR)
                Logger.minor(this, "Finishing opennet: relaying reference from 
"+rs.successFrom());
@@ -392,32 +391,22 @@
                PeerNode dataSource = rs.successFrom();

                try {
-                       om.sendOpennetRef(false, uid, source, 
om.crypto.myCompressedFullRef(), this);
+                       om.sendOpennetRef(false, uid, source, noderef, this);
                } catch (NotConnectedException e) {
                        // Lost contact with request source, nothing we can do
                        return;
                }

-               // Now wait for reply from the request source
+               // Now wait for reply from the request source.

-               MessageFilter mf = 
MessageFilter.create().setSource(source).setField(DMT.UID, 
uid).setTimeout(RequestSender.OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectReply);
-
-               Message msg;
-               try {
-                       msg = node.usm.waitFor(mf, this);
-               } catch (DisconnectedException e) {
-                       return; // Lost connection with request source
-               }
+               byte[] newNoderef = om.waitForOpennetNoderef(true, source, uid, 
this);

-               if(msg == null) {
-                       // Timeout
-                       return;
-               }
+               if(noderef == null) return;

-               noderef = 
((ShortBuffer)msg.getObject(DMT.OPENNET_NODEREF)).getData();
+               // Send it forward to the data source, if it is valid.

                try {
-                       SimpleFieldSet fs = 
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
+                       SimpleFieldSet fs = 
PeerNode.compressedNoderefToFieldSet(newNoderef, 0, noderef.length);
                        if(fs.getBoolean("opennet", false)) {
                                try {
                                        om.sendOpennetRef(true, uid, 
dataSource, noderef, this);
@@ -429,13 +418,6 @@
                } catch (FSParseException e1) {
                        // Invalid, clear it
                }
-               
-               try {
-                       dataSource.sendAsync(msg, null, 0, this);
-               } catch (NotConnectedException e) {
-                       // How sad
-                       return;
-               }
        }

        private Message createDataFound(KeyBlock block) {

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2007-10-25 16:31:34 UTC 
(rev 15561)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2007-10-25 17:20:23 UTC 
(rev 15562)
@@ -18,7 +18,9 @@
 import freenet.io.comm.ReferenceSignatureVerificationException;
 import freenet.io.comm.RetrievalException;
 import freenet.io.xfer.BlockReceiver;
+import freenet.io.xfer.BulkReceiver;
 import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.io.xfer.PartiallyReceivedBulk;
 import freenet.keys.CHKBlock;
 import freenet.keys.Key;
 import freenet.keys.KeyVerifyException;
@@ -30,6 +32,8 @@
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
 import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
+import freenet.support.io.ByteArrayRandomAccessThing;

 /**
  * @author amphibian
@@ -652,6 +656,8 @@
                } catch (DisconnectedException e) {
                        // Fine by me.
                }
+               
+               // FIXME support new format path folding
        }

        /**
@@ -663,60 +669,39 @@
      */
     private void finishOpennet(PeerNode next) {

-       try {
-       MessageFilter mfAck = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetCompletedAck);
-       MessageFilter mfConnect = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(OPENNET_TIMEOUT).setType(DMT.FNPOpennetConnectDestination);
-       MessageFilter mf = 
mfAck.or(mfConnect).setMatchesDroppedConnection(true).setMatchesRestartedConnections(true);
+       OpennetManager om;

-       Message m;
-               try {
-                       m = node.usm.waitFor(mf, this);
-               } catch (DisconnectedException e) {
-                       return; // Ok
-               }
-       
-       if(m == null) {
-               // Timeout
-               Logger.error(this, "Timed out waiting for opennet 
acknowledgement on "+this+" from "+next);
-               return;
-       } else if(m.getSpec() == DMT.FNPOpennetCompletedAck) {
-               if(logMINOR)
-                       Logger.minor(this, "Destination does not want to path 
fold on "+this+" from "+next);
-               return;
-       } else if(!(m.getSpec() == DMT.FNPOpennetConnectDestination)) {
-               Logger.error(this, "Got a "+m+" expecting opennet completed / 
opennet connect destination on "+this+" from "+next);
-               return;
-       }
-       
-       // DMT.getSpec() === DMT.FNPOpennetConnectDestination
-       
-       byte[] noderef = ((ShortBuffer) 
m.getObject(DMT.OPENNET_NODEREF)).getData();
-       
-       SimpleFieldSet ref;
-               try {
-                       ref = PeerNode.compressedNoderefToFieldSet(noderef, 0, 
noderef.length);
-               } catch (FSParseException e) {
-                       Logger.error(this, "Could not parse opennet noderef for 
"+this+" from "+next, e);
-                       return;
-               }
-               
-               if(!ref.getBoolean("opennet", false)) {
-                       Logger.error(this, "Could not parse opennet noderef for 
"+this+" from "+next);
-                       return;
-               }
-       
-               OpennetManager om = node.getOpennet();
        try {
-                       if(om == null || 
-                                       (om != null /* prevent race */ && 
!node.addNewOpennetNode(ref))) {
+               om = node.getOpennet();
+               
+               if(om == null) return; // Nothing to do
+               
+               byte[] noderef = om.waitForOpennetNoderef(false, next, uid, 
this);
+               
+               if(noderef == null) return;
+               
+               SimpleFieldSet ref = 
PeerNode.compressedNoderefToFieldSet(noderef, 0, noderef.length);
+               
+               if(!ref.getBoolean("opennet", false)) {
+                       Logger.error(this, "Could not parse opennet noderef for 
"+this+" from "+next);
+                       return;
+               }
+               
+                       if(!node.addNewOpennetNode(ref)) {
                                // If we don't want it let somebody else have it
                                synchronized(this) {
                                        opennetNoderef = noderef;
+                                       // RequestHandler will send a noderef 
back up, eventually
                                }
                                return;
                        } else {
+                               // opennetNoderef = null i.e. we want the 
noderef so we won't pass it further down.
                                Logger.error(this, "Added opennet noderef in 
"+this+" from "+next);
                        }
+                       
+               // We want the node: send our reference
+               om.sendOpennetRef(true, uid, next, 
om.crypto.myCompressedFullRef(), this);
+
                } catch (FSParseException e) {
                        Logger.error(this, "Could not parse opennet noderef for 
"+this+" from "+next, e);
                        return;
@@ -726,17 +711,10 @@
                } catch (ReferenceSignatureVerificationException e) {
                        Logger.error(this, "Bad signature on opennet noderef 
for "+this+" from "+next+" : "+e, e);
                        return;
-               }
-       
-       // Send our reference
-       
-       try {
-               om.sendOpennetRef(true, uid, source, 
om.crypto.myCompressedFullRef(), this);
                } catch (NotConnectedException e) {
                        // Hmmm... let the LRU deal with it
                        if(logMINOR)
                                Logger.minor(this, "Not connected sending 
ConnectReply on "+this+" to "+next);
-               }
        } finally {
                synchronized(this) {
                        opennetFinished = true;


Reply via email to