Author: toad
Date: 2007-12-01 17:14:09 +0000 (Sat, 01 Dec 2007)
New Revision: 16190

Added:
   trunk/freenet/src/freenet/node/AnnounceSender.java
Modified:
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/OpennetManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Opennet announcement. Untested.

Added: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java                          
(rev 0)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java  2007-12-01 17:14:09 UTC 
(rev 16190)
@@ -0,0 +1,392 @@
+package freenet.node;
+
+import java.util.HashSet;
+
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
+public class AnnounceSender implements Runnable, ByteCounter {
+
+    // Constants
+    static final int ACCEPTED_TIMEOUT = 5000;
+    static final int ANNOUNCE_TIMEOUT = 240000; // longer than a regular 
request as have to transfer noderefs hop by hop etc
+       
+       private final PeerNode source;
+       private final long uid;
+       private final OpennetManager om;
+       private final Node node;
+       private Message msg;
+       private byte[] noderefBuf;
+       private int noderefLength;
+       private short htl;
+       private double nearestLoc;
+       private double target;
+       private static boolean logMINOR;
+       
+       public AnnounceSender(Message m, long uid, PeerNode source, 
OpennetManager om, Node node) {
+               this.source = source;
+               this.uid = uid;
+               this.msg = m;
+               this.om = om;
+               this.node = node;
+               htl = m.getShort(DMT.HTL);
+               target = m.getDouble(DMT.TARGET_LOCATION); // FIXME validate
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+       }
+
+       public void run() {
+               try {
+                       realRun();
+               } catch (Throwable t) {
+                       Logger.error(this, "Caught "+t+" announcing "+uid+" 
from "+source, t);
+               } finally {
+                       source.completedAnnounce(uid);
+                       source.node.completed(uid);
+               }
+       }
+
+       private void realRun() {
+               boolean hasForwarded = false;
+               try {
+                       source.sendAsync(DMT.createFNPAccepted(uid), null, 0, 
null);
+               } catch (NotConnectedException e) {
+                       return;
+               }
+               if(source != null) {
+                       if(!transferNoderef()) return;
+               }
+               
+        double myLoc = node.lm.getLocation();
+        if(Location.distance(target, myLoc) < Location.distance(target, 
nearestLoc)) {
+            nearestLoc = myLoc;
+            htl = node.maxHTL();
+        } else {
+               if(source != null)
+                       htl = node.decrementHTL(source, htl);
+        }
+        
+               // Now route it.
+               
+        HashSet nodesRoutedTo = new HashSet();
+        HashSet nodesNotIgnored = new HashSet();
+        while(true) {
+            if(logMINOR) Logger.minor(this, "htl="+htl);
+            if(htl == 0) {
+               // No more nodes.
+               complete();
+               return;
+            }
+            
+            // Route it
+            PeerNode next;
+            next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
+            
+            if(next == null) {
+                // Backtrack
+               rnf();
+                return;
+            }
+            if(logMINOR) Logger.minor(this, "Routing request to "+next);
+            nodesRoutedTo.add(next);
+            
+            if(hasForwarded)
+               htl = node.decrementHTL(source, htl);
+            
+            if(!sendTo(next)) continue;
+            
+            hasForwarded = true;
+            
+            Message msg = null;
+            
+            while(true) {
+               
+                /**
+                 * What are we waiting for?
+                 * FNPAccepted - continue
+                 * FNPRejectedLoop - go to another node
+                 * FNPRejectedOverload - go to another node
+                 */
+                
+                MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+                MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+                MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+                MessageFilter mfOpennetDisabled = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+                MessageFilter mfOpennetNoderefRejected = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetNoderefRejected);
+                
+                // mfRejectedOverload must be the last thing in the or
+                // So its or pointer remains null
+                // Otherwise we need to recreate it below
+                MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfOpennetDisabled.or(mfOpennetNoderefRejected))));
+                
+                try {
+                    msg = node.usm.waitFor(mf, this);
+                    if(logMINOR) Logger.minor(this, "first part got "+msg);
+                } catch (DisconnectedException e) {
+                    Logger.normal(this, "Disconnected from "+next+" while 
waiting for Accepted on "+uid);
+                    break;
+                }
+                
+               if(msg == null) {
+                       if(logMINOR) Logger.minor(this, "Timeout waiting for 
Accepted");
+                       // Try next node
+                       msg = null;
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRejectedLoop) {
+                       if(logMINOR) Logger.minor(this, "Rejected loop");
+                       // Find another node to route to
+                       msg = null;
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRejectedOverload) {
+                       if(logMINOR) Logger.minor(this, "Rejected: overload");
+                                       // Give up on this one, try another
+                       msg = null;
+                                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+                       source.setOpennetDisabled();
+                       msg = null;
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetNoderefRejected) {
+                       int reason = msg.getInt(DMT.REJECT_CODE);
+                       Logger.normal(this, "Announce rejected by "+source+" : 
"+DMT.getOpennetRejectedCode(reason));
+                       msg = null;
+                       break;
+               }
+               
+               if(msg.getSpec() != DMT.FNPAccepted) {
+                       Logger.error(this, "Unrecognized message: "+msg);
+                       continue;
+               }
+               
+               break;
+            }
+            
+            if((msg == null) || (msg.getSpec() != DMT.FNPAccepted)) {
+               // Try another node
+               continue;
+            }
+
+            if(logMINOR) Logger.minor(this, "Got Accepted");
+            
+            // Otherwise, must be Accepted
+            
+            // So wait...
+            
+            while(true) {
+               
+               MessageFilter mfAnnounceCompleted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceCompleted);
+               MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRouteNotFound);
+               MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRejectedOverload);
+               MessageFilter mfAnnounceReply = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceReply);
+                MessageFilter mfOpennetDisabled = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+               MessageFilter mf = 
mfAnnounceCompleted.or(mfRouteNotFound.or(mfRejectedOverload.or(mfAnnounceReply.or(mfOpennetDisabled))));
+               
+               try {
+                       msg = node.usm.waitFor(mf, this);
+               } catch (DisconnectedException e) {
+                       Logger.normal(this, "Disconnected from "+next+" while 
waiting for announcement");
+                       break;
+               }
+               
+               if(logMINOR) Logger.minor(this, "second part got "+msg);
+               
+               if(msg == null) {
+                       // Fatal timeout
+                       timedOut();
+                       return;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetAnnounceCompleted) {
+                       complete();
+                       return;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRouteNotFound) {
+                       // Backtrack within available hops
+                       short newHtl = msg.getShort(DMT.HTL);
+                       if(newHtl < htl) htl = newHtl;
+                       break;
+               }
+
+               if(msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Give up on this one, try another
+                                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+                       source.setOpennetDisabled();
+                       msg = null;
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPOpennetAnnounceReply) {
+                       validateForwardReply(msg, next);
+                       continue; // There may be more
+               }
+               
+               Logger.error(this, "Unexpected message: "+msg);
+            }
+        }
+       }
+
+       /**
+        * Validate a reply, and relay it back to the source.
+        * @param msg2 The AnnouncementReply message.
+        * @return True unless we lost the connection to our request source.
+        */
+       private boolean validateForwardReply(Message msg, PeerNode source) {
+               long xferUID = msg.getLong(DMT.TRANSFER_UID);
+               int noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+               int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+               byte[] noderefBuf = om.innerWaitForOpennetNoderef(xferUID, 
paddedLength, noderefLength, source, false, uid, true, this);
+               if(noderefBuf == null) {
+                       return true; // Don't relay
+               }
+               SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0, 
noderefLength, source);
+               if(fs == null) {
+                       om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, 
this);
+                       return true; // Don't relay
+               }
+               // Now relay it
+               try {
+                       om.sendAnnouncementReply(uid, source, noderefBuf, this);
+               } catch (NotConnectedException e) {
+                       // Hmmm...!
+                       return false;
+               }
+               return true;
+       }
+
+       /**
+        * Send an AnnouncementRequest.
+        * @param next The node to send the announcement to.
+        * @return True if the announcement was successfully sent.
+        */
+       private boolean sendTo(PeerNode next) {
+               try {
+                       om.sendAnnouncementRequest(uid, next, noderefBuf, this, 
target, htl, nearestLoc);
+               } catch (NotConnectedException e) {
+                       if(logMINOR) Logger.minor(this, "Disconnected");
+                       return false;
+               }
+               return true;
+       }
+
+       private void timedOut() {
+               Message msg = DMT.createFNPRejectedOverload(uid, false);
+               try {
+                       source.sendAsync(msg, null, 0, this);
+               } catch (NotConnectedException e) {
+                       // Ok
+               }
+       }
+
+       private void rnf() {
+               Message msg = DMT.createFNPRouteNotFound(uid, htl);
+               try {
+                       source.sendAsync(msg, null, 0, this);
+               } catch (NotConnectedException e) {
+                       // Ok
+               }
+       }
+
+       private void complete() {
+               Message msg = DMT.createFNPOpennetAnnounceCompleted(uid);
+               try {
+                       source.sendAsync(msg, null, 0, this);
+               } catch (NotConnectedException e) {
+                       // Oh well.
+               }
+       }
+
+       /**
+        * @return True unless the noderef is bogus.
+        */
+       private boolean transferNoderef() {
+               long xferUID = msg.getLong(DMT.TRANSFER_UID);
+               noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+               int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+               noderefBuf = om.innerWaitForOpennetNoderef(xferUID, 
paddedLength, noderefLength, source, false, uid, true, this);
+               if(noderefBuf == null) {
+                       return false;
+               }
+               SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0, 
noderefLength, source);
+               if(fs == null) {
+                       om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, 
this);
+                       return false;
+               }
+               // If we want it, add it and send it.
+               try {
+                       if(om.addNewOpennetNode(fs)) {
+                               sendOurRef();
+                       } else {
+                               // Okay, just route it.
+                       }
+               } catch (FSParseException e) {
+                       om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, 
this);
+                       return false;
+               } catch (PeerParseException e) {
+                       om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, 
this);
+                       return false;
+               } catch (ReferenceSignatureVerificationException e) {
+                       om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, 
this);
+                       return false;
+               }
+               return true;
+       }
+
+       private void sendOurRef() {
+               // FIXME transmit our noderef back to the node
+               // TODO Auto-generated method stub
+               
+       }
+
+       private volatile Object totalBytesSync = new Object();
+       private int totalBytesSent;
+       
+       public void sentBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesSent += x;
+               }
+       }
+       
+       public int getTotalSentBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesSent;
+               }
+       }
+       
+       private int totalBytesReceived;
+       
+       public void receivedBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesReceived += x;
+               }
+       }
+       
+       public int getTotalReceivedBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesReceived;
+               }
+       }
+
+       public void sentPayload(int x) {
+               // Doesn't count.
+       }
+       
+}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2007-12-01 17:00:49 UTC (rev 
16189)
+++ trunk/freenet/src/freenet/node/Node.java    2007-12-01 17:14:09 UTC (rev 
16190)
@@ -2224,7 +2224,7 @@
        /**
         * A request completed (regardless of success).
         */
-       private synchronized void completed(long id) {
+       synchronized void completed(long id) {
                recentlyCompletedIDs.push(new Long(id));
                while(recentlyCompletedIDs.size() > MAX_RECENTLY_COMPLETED_IDS)
                        recentlyCompletedIDs.pop();

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-12-01 17:00:49 UTC 
(rev 16189)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-12-01 17:14:09 UTC 
(rev 16190)
@@ -97,6 +97,8 @@
                        return node.nodeUpdater.uom.handleRequestMain(m, 
source);
                } else if(spec == DMT.UOMSendingMain) {
                        return node.nodeUpdater.uom.handleSendingMain(m, 
source);
+               } else if(spec == DMT.FNPOpennetAnnounceRequest) {
+                       return handleAnnounceRequest(m, source);
                }

                if(!source.isRoutable()) return false;
@@ -141,7 +143,7 @@
 //                     return handleProbeRejected(m, source);
 //             } else if(spec == DMT.FNPProbeTrace) {
 //                     return handleProbeTrace(m, source);
-               }
+               } 
                return false;
        }

@@ -265,6 +267,49 @@
                return true;
        }

+       private boolean handleAnnounceRequest(Message m, PeerNode source) {
+               long uid = m.getLong(DMT.UID);
+               OpennetManager om = node.getOpennet();
+               if(om == null) {
+                       Message msg = DMT.createFNPOpennetDisabled(uid);
+                       try {
+                               source.sendAsync(msg, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               // Ok
+                       }
+                       return true;
+               }
+               if(node.recentlyCompleted(uid)) {
+                       Message msg = DMT.createFNPRejectedLoop(uid);
+                       try {
+                               source.sendAsync(msg, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               // Ok
+                       }
+                       return true;
+               }
+               boolean success = false;
+               try {
+                       if(!source.shouldAcceptAnnounce(uid)) {
+                               node.completed(uid);
+                               Message msg = 
DMT.createFNPRejectedOverload(uid, true);
+                               try {
+                                       source.sendAsync(msg, null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       // Ok
+                               }
+                               return true;
+                       }
+                       AnnounceSender sender = new AnnounceSender(m, uid, 
source, om, node);
+                       node.executor.execute(sender, "Announcement sender for 
"+uid);
+                       success = true;
+                       return true;
+               } finally {
+                       if(!success)
+                               source.completedAnnounce(uid);
+               }
+       }
+
        final Hashtable routedContexts = new Hashtable();

        static class RoutedContext {

Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java  2007-12-01 17:00:49 UTC 
(rev 16189)
+++ trunk/freenet/src/freenet/node/OpennetManager.java  2007-12-01 17:14:09 UTC 
(rev 16190)
@@ -518,6 +518,21 @@
                innerSendOpennetRef(xferUID, padded, peer);
        }

+       public void sendAnnouncementReply(long uid, PeerNode peer, byte[] 
noderef, ByteCounter ctr) 
+       throws NotConnectedException {
+               byte[] padded = new byte[PADDED_NODEREF_SIZE];
+               if(noderef.length > padded.length) {
+                       Logger.error(this, "Noderef too big: "+noderef.length+" 
bytes");
+                       return;
+               }
+               System.arraycopy(noderef, 0, padded, 0, noderef.length);
+               long xferUID = node.random.nextLong();
+               Message msg = DMT.createFNPOpennetAnnounceReply(uid, xferUID, 
noderef.length, 
+                               padded.length);
+               peer.sendAsync(msg, null, 0, ctr);
+               innerSendOpennetRef(xferUID, padded, peer);
+       }
+       
        /**
         * Wait for an opennet noderef.
         * @param isReply If true, wait for an FNPOpennetConnectReply[New], if 
false wait for an FNPOpennetConnectDestination[New].

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2007-12-01 17:00:49 UTC 
(rev 16189)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2007-12-01 17:14:09 UTC 
(rev 16190)
@@ -266,6 +266,7 @@
        final WeakReference myRef;
        /** The node is being disconnected, but it may take a while. */
        private boolean disconnecting;
+       
        /**
         * For FNP link setup:
         *  The initiator has to ensure that nonces send back by the
@@ -3038,4 +3039,47 @@
        synchronized boolean manyPacketsClaimedSentNotReceived() {
                return manyPacketsClaimedSentNotReceived;
        }
+       
+       static final int MAX_SIMULTANEOUS_ANNOUNCEMENTS = 1;
+       static final int MAX_ANNOUNCE_DELAY = 1000;
+       private long timeLastAcceptedAnnouncement;
+       private long[] runningAnnounceUIDs = new long[0];
+       
+       public synchronized boolean shouldAcceptAnnounce(long uid) {
+               long now = System.currentTimeMillis();
+               if(runningAnnounceUIDs.length < MAX_SIMULTANEOUS_ANNOUNCEMENTS 
&&
+                               now - timeLastAcceptedAnnouncement > 
MAX_ANNOUNCE_DELAY) {
+                       long[] newList = new long[runningAnnounceUIDs.length + 
1];
+                       if(runningAnnounceUIDs.length > 0)
+                               System.arraycopy(runningAnnounceUIDs, 0, 
newList, 0, runningAnnounceUIDs.length);
+                       newList[runningAnnounceUIDs.length] = uid;
+                       timeLastAcceptedAnnouncement = now;
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+       
+       public synchronized boolean completedAnnounce(long uid) {
+               if(runningAnnounceUIDs.length == 0) return false;
+               long[] newList = new long[runningAnnounceUIDs.length - 1];
+               int x = 0;
+               for(int i=0;i<runningAnnounceUIDs.length;i++) {
+                       if(i == runningAnnounceUIDs.length) return false;
+                       long l = runningAnnounceUIDs[i];
+                       if(l == uid) continue;
+                       newList[x++] = l;
+               }
+               runningAnnounceUIDs = newList;
+               if(x < runningAnnounceUIDs.length) {
+                       newList = new long[x];
+                       System.arraycopy(runningAnnounceUIDs, 0, newList, 0, x);
+                       runningAnnounceUIDs = newList;
+               }
+               return true;
+       }
+
+       public void setOpennetDisabled() {
+               // FIXME
+       }
 }


Reply via email to