Author: toad
Date: 2007-11-28 21:08:37 +0000 (Wed, 28 Nov 2007)
New Revision: 16045

Added:
   trunk/freenet/src/freenet/io/PortForwardBrokenDetector.java
   trunk/freenet/src/freenet/io/comm/PortForwardSensitiveSocketHandler.java
Modified:
   trunk/freenet/src/freenet/io/AddressTracker.java
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeCrypto.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Add port-forward-broken detection via he-says-he-sent-packets vs 
i-received-packets comparison.

Modified: trunk/freenet/src/freenet/io/AddressTracker.java
===================================================================
--- trunk/freenet/src/freenet/io/AddressTracker.java    2007-11-28 20:22:38 UTC 
(rev 16044)
+++ trunk/freenet/src/freenet/io/AddressTracker.java    2007-11-28 21:08:37 UTC 
(rev 16045)
@@ -49,6 +49,8 @@
        /** InetAddressAddressTrackerItem's by InetAddress */
        private final HashMap ipTrackers;

+       private PortForwardBrokenDetector brokenDetector;
+       
        /** Maximum number of Item's of either type */
        static final int MAX_ITEMS = 1000;

@@ -224,6 +226,7 @@

        public int getPortForwardStatus() {
                long minGap = getLongestSendReceiveGap(HORIZON);
+               if(brokenDetector != null && brokenDetector.isBroken()) return 
DEFINITELY_NATED;
                if(minGap > DEFINITELY_TUNNEL_LENGTH)
                        return DEFINITELY_PORT_FORWARDED;
                if(minGap > MAYBE_TUNNEL_LENGTH)
@@ -231,6 +234,10 @@
                return DONT_KNOW;
        }

+       public synchronized void setBrokenDetector(PortForwardBrokenDetector d) 
{
+               brokenDetector = d;
+       }
+       
        public static String statusString(int status) {
                switch(status) {
                case DEFINITELY_PORT_FORWARDED:
@@ -298,4 +305,9 @@
                }
                return sfs;
        }
+
+       /** Called when something changes at a higher level suggesting that the 
status may be wrong */
+       public void rescan() {
+               // Do nothing for now, as we don't maintain any final state yet.
+       }
 }

Added: trunk/freenet/src/freenet/io/PortForwardBrokenDetector.java
===================================================================
--- trunk/freenet/src/freenet/io/PortForwardBrokenDetector.java                 
        (rev 0)
+++ trunk/freenet/src/freenet/io/PortForwardBrokenDetector.java 2007-11-28 
21:08:37 UTC (rev 16045)
@@ -0,0 +1,8 @@
+package freenet.io;
+
+public interface PortForwardBrokenDetector {
+
+       /** @return True if there is a good reason to think that port 
forwarding is broken. */
+       boolean isBroken();
+
+}

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2007-11-28 20:22:38 UTC (rev 
16044)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2007-11-28 21:08:37 UTC (rev 
16045)
@@ -1082,12 +1082,14 @@
        public static final MessageType FNPSentPackets = new 
MessageType("FNPSentPackets") {{
                addField(TIME_DELTAS, ShortBuffer.class);
                addField(HASHES, ShortBuffer.class);
+               addField(TIME, Long.class);
        }};

-       public static final Message createFNPSentPackets(int[] timeDeltas, 
long[] hashes) {
+       public static final Message createFNPSentPackets(int[] timeDeltas, 
long[] hashes, long baseTime) {
                Message msg = new Message(FNPSentPackets);
                msg.set(TIME_DELTAS, new 
ShortBuffer(Fields.intsToBytes(timeDeltas)));
                msg.set(HASHES, new ShortBuffer(Fields.longsToBytes(hashes)));
+               msg.set(TIME, baseTime);
                return msg;
        }


Added: trunk/freenet/src/freenet/io/comm/PortForwardSensitiveSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PortForwardSensitiveSocketHandler.java    
                        (rev 0)
+++ trunk/freenet/src/freenet/io/comm/PortForwardSensitiveSocketHandler.java    
2007-11-28 21:08:37 UTC (rev 16045)
@@ -0,0 +1,9 @@
+package freenet.io.comm;
+
+public interface PortForwardSensitiveSocketHandler extends SocketHandler {
+
+       /** Something has changed at a higher level suggesting the port 
forwarding status may be bogus,
+        * so we need to rescan. */
+       void rescanPortForward();
+
+}

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java     2007-11-28 
20:22:38 UTC (rev 16044)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java     2007-11-28 
21:08:37 UTC (rev 16045)
@@ -12,6 +12,7 @@
 import org.tanukisoftware.wrapper.WrapperManager;

 import freenet.io.AddressTracker;
+import freenet.io.PortForwardBrokenDetector;
 import freenet.io.comm.Peer.LocalAddressException;
 import freenet.node.LoggingConfigHandler;
 import freenet.node.Node;
@@ -20,7 +21,7 @@
 import freenet.support.Logger;
 import freenet.support.OOMHandler;

-public class UdpSocketHandler extends Thread implements PacketSocketHandler {
+public class UdpSocketHandler extends Thread implements PacketSocketHandler, 
PortForwardSensitiveSocketHandler {

        private final DatagramSocket _sock;
        private final InetAddress _bindTo;
@@ -398,4 +399,12 @@
                return tracker;
        }

+       public void rescanPortForward() {
+               tracker.rescan();
+       }
+
+       public void setPortForwardBrokenDetector(PortForwardBrokenDetector 
detector) {
+               tracker.setBrokenDetector(detector);
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2007-11-28 20:22:38 UTC (rev 
16044)
+++ trunk/freenet/src/freenet/node/Node.java    2007-11-28 21:08:37 UTC (rev 
16045)
@@ -48,6 +48,7 @@
 import freenet.crypt.RandomSource;
 import freenet.crypt.SHA256;
 import freenet.crypt.Yarrow;
+import freenet.io.PortForwardBrokenDetector;
 import freenet.io.comm.DMT;
 import freenet.io.comm.DisconnectedException;
 import freenet.io.comm.FreenetInetAddress;

Modified: trunk/freenet/src/freenet/node/NodeCrypto.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeCrypto.java      2007-11-28 20:22:38 UTC 
(rev 16044)
+++ trunk/freenet/src/freenet/node/NodeCrypto.java      2007-11-28 21:08:37 UTC 
(rev 16045)
@@ -20,6 +20,7 @@
 import freenet.crypt.Global;
 import freenet.crypt.RandomSource;
 import freenet.crypt.SHA256;
+import freenet.io.PortForwardBrokenDetector;
 import freenet.io.comm.FreenetInetAddress;
 import freenet.io.comm.Peer;
 import freenet.io.comm.UdpSocketHandler;
@@ -78,7 +79,7 @@
         * Get port number from a config, create socket and packet mangler
         * @throws NodeInitException 
         */
-       public NodeCrypto(Node node, boolean isOpennet, NodeCryptoConfig 
config, long startupTime) throws NodeInitException {
+       public NodeCrypto(final Node node, final boolean isOpennet, 
NodeCryptoConfig config, long startupTime) throws NodeInitException {

                this.node = node;
                this.config = config;
@@ -127,6 +128,18 @@
                }
                socket = u;

+               u.setPortForwardBrokenDetector(new PortForwardBrokenDetector() {
+                       public boolean isBroken() {
+                               PeerManager pm = node.peers;
+                               if(pm == null) return false;
+                               PeerNode[] peers = isOpennet ? 
((PeerNode[])pm.getOpennetPeers()) : ((PeerNode[])pm.getDarknetPeers());
+                               for(int i=0;i<peers.length;i++) {
+                                       
if(peers[i].manyPacketsClaimedSentNotReceived()) return true;
+                               }
+                               return false;
+                       }
+               });
+               
                Logger.normal(this, "FNP port created on "+bindto+ ':' +port);
                System.out.println("FNP port created on "+bindto+ ':' +port);
                portNumber = port;

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-11-28 20:22:38 UTC 
(rev 16044)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2007-11-28 21:08:37 UTC 
(rev 16045)
@@ -85,6 +85,9 @@
                        return true;
                } else if(spec == DMT.FNPTime) {
                        return handleTime(m, source);
+               } else if(spec == DMT.FNPSentPackets) {
+                       source.handleSentPackets(m);
+                       return true;
                } else if(spec == DMT.FNPVoid) {
                        return true;
                } else if(spec == DMT.FNPDisconnect) {

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2007-11-28 20:22:38 UTC 
(rev 16044)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2007-11-28 21:08:37 UTC 
(rev 16045)
@@ -45,6 +45,7 @@
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.PeerParseException;
+import freenet.io.comm.PortForwardSensitiveSocketHandler;
 import freenet.io.comm.ReferenceSignatureVerificationException;
 import freenet.io.comm.SocketHandler;
 import freenet.io.xfer.PacketThrottle;
@@ -58,6 +59,7 @@
 import freenet.support.IllegalBase64Exception;
 import freenet.support.LRUHashtable;
 import freenet.support.Logger;
+import freenet.support.ShortBuffer;
 import freenet.support.SimpleFieldSet;
 import freenet.support.WouldBlockException;
 import freenet.support.math.RunningAverage;
@@ -1689,7 +1691,7 @@
                long[] newHashes = new long[hashes.length - skip];
                System.arraycopy(hashes, skip, newHashes, 0, hashes.length - 
skip);
        }
-       return DMT.createFNPSentPackets(timeDeltas, hashes);
+       return DMT.createFNPSentPackets(timeDeltas, hashes, now);
        }

        private void sendIPAddressMessage() {
@@ -2979,4 +2981,95 @@
                return new long[][] { times, hashes };
        }

+       static final int SENT_PACKETS_MAX_TIME_AFTER_CONNECT = 5*60*1000;
+       
+       /**
+        * Handle an FNPSentPackets message
+        */
+       public void handleSentPackets(Message m) {
+               long now = System.currentTimeMillis();
+               synchronized(this) {
+                       if(forceDisconnectCalled) return;
+                       if(now - this.timeLastConnected < 
SENT_PACKETS_MAX_TIME_AFTER_CONNECT) return;
+               }
+               long baseTime = m.getLong(DMT.TIME);
+               baseTime += this.clockDelta;
+               // Should be a reasonable approximation now
+               int[] timeDeltas = Fields.bytesToInts(((ShortBuffer) 
m.getObject(DMT.TIME_DELTAS)).getData());
+               long[] packetHashes = Fields.bytesToLongs(((ShortBuffer) 
m.getObject(DMT.TIME_DELTAS)).getData());
+               long[] times = new long[timeDeltas.length];
+               for(int i=0;i<times.length;i++)
+                       times[i] = baseTime - timeDeltas[i];
+               long tolerance = 60*1000 + (Math.abs(timeDeltas[0]) / 20); // 1 
minute or 5% of full interval
+               synchronized(this) {
+                       // They are in increasing order
+                       // Loop backwards
+                       long otime = Long.MAX_VALUE;
+                       long[][] sent = getSentPacketTimesHashes();
+                       long[] sentTimes = sent[0];
+                       long[] sentHashes = sent[1];
+                       short sentPtr = (short)(sent.length-1);
+                       short notFoundCount = 0;
+                       short consecutiveNotFound = 0;
+                       short longestConsecutiveNotFound = 0;
+                       for(short i=(short)times.length;i>=0;i--) {
+                               long time = times[i];
+                               if(time > otime) {
+                                       Logger.error(this, "Inconsistent time 
order: ["+i+"]="+time+" but ["+(i+1)+"] is "+otime);
+                                       return;
+                               } else otime = time;
+                               long hash = packetHashes[i];
+                               // Search for the hash.
+                               short match = -1;
+                               // First try forwards
+                               for(short j=sentPtr;j<sentTimes.length;j++) {
+                                       long ttime = sentTimes[j];
+                                       if(sentHashes[j] == hash) {
+                                               match = j;
+                                               sentPtr = j;
+                                               break;
+                                       }
+                                       if(ttime - time > tolerance) break;
+                               }
+                               if(match == -1) {
+                                       for(short 
j=(short)(sentPtr-1);j>=0;j--) {
+                                               long ttime = sentTimes[j];
+                                               if(sentHashes[j] == hash) {
+                                                       match = j;
+                                                       sentPtr = j;
+                                                       break;
+                                               }
+                                               if(time - ttime > tolerance) 
break;
+                                       }
+                               }
+                               if(match == -1) {
+                                       // Not found
+                                       consecutiveNotFound++;
+                                       notFoundCount++;
+                               } else {
+                                       if(consecutiveNotFound > 
longestConsecutiveNotFound)
+                                               longestConsecutiveNotFound = 
consecutiveNotFound;
+                                       consecutiveNotFound = 0;
+                               }
+                       }
+                       if(consecutiveNotFound > longestConsecutiveNotFound)
+                               longestConsecutiveNotFound = 
consecutiveNotFound;
+                       if(consecutiveNotFound > TRACK_PACKETS / 2) {
+                               manyPacketsClaimedSentNotReceived = true;
+                               Logger.error(this, ""+consecutiveNotFound+" 
consecutive packets not found on "+userToString());
+                               SocketHandler handler = 
outgoingMangler.getSocketHandler();
+                               if(handler instanceof 
PortForwardSensitiveSocketHandler) {
+                                       
((PortForwardSensitiveSocketHandler)handler).rescanPortForward();
+                               }
+                       }
+               }
+               // TODO Auto-generated method stub
+               
+       }
+
+       private boolean manyPacketsClaimedSentNotReceived = false;
+       
+       synchronized boolean manyPacketsClaimedSentNotReceived() {
+               return manyPacketsClaimedSentNotReceived;
+       }
 }


Reply via email to