Author: toad
Date: 2005-12-06 16:19:14 +0000 (Tue, 06 Dec 2005)
New Revision: 7681

Modified:
   trunk/freenet/src/freenet/io/comm/LowLevelFilterException.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/WouldBlockException.java
Log:
280: (mandatory)
Probabilistic reject of requests/inserts:
- Under 1000ms ping, don't reject any
- Over 2000ms, reject all
- Between the two, probabilistic reject according to ping
- Accept at least one request every 10 seconds regardless to ensure that the 
throttled packet send time gets updated occasionally, and we don't get stuck
Don't do blocking allocation of packet numbers on the packet sender thread! 
This would cause the whole node to block, not able to send any other packets...
=> Lots of methods in FNPPacketMangler can throw WouldBlockException.
Delete some dead code.
Refactor in PacketSender, and handle exceptions better.


Modified: trunk/freenet/src/freenet/io/comm/LowLevelFilterException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/LowLevelFilterException.java      
2005-12-06 14:23:03 UTC (rev 7680)
+++ trunk/freenet/src/freenet/io/comm/LowLevelFilterException.java      
2005-12-06 16:19:14 UTC (rev 7681)
@@ -6,4 +6,8 @@
                super(string);
        }

+       public LowLevelFilterException() {
+               super();
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2005-12-06 
14:23:03 UTC (rev 7680)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2005-12-06 
16:19:14 UTC (rev 7681)
@@ -938,8 +938,9 @@
      * Build a packet and send it. From a Message recently converted into 
byte[],
      * but with no outer formatting.
      * @throws PacketSequenceException 
+     * @throws WouldBlockException 
      */
-    public void processOutgoing(byte[] buf, int offset, int length, 
PeerContext peer) throws NotConnectedException, PacketSequenceException {
+    public void processOutgoing(byte[] buf, int offset, int length, 
PeerContext peer) throws NotConnectedException, PacketSequenceException, 
WouldBlockException {
        Logger.minor(this, "processOutgoing(buf, "+offset+", "+length+", 
"+peer.getPeer());
         if(!(peer instanceof PeerNode))
             throw new IllegalArgumentException();
@@ -953,8 +954,9 @@
      * Build a packet and send it. From a Message recently converted into 
byte[],
      * but with no outer formatting.
      * @throws PacketSequenceException 
+     * @throws WouldBlockException 
      */
-    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker) throws KeyChangedException, NotConnectedException, 
PacketSequenceException {
+    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker) throws KeyChangedException, NotConnectedException, 
PacketSequenceException, WouldBlockException {
         byte[] newBuf = preformat(buf, offset, length);
         processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, -1, 
null);
     }
@@ -963,8 +965,9 @@
     /**
      * Send a packet, with a packet number.
      * @throws PacketSequenceException 
+     * @throws WouldBlockException If allocating a packet number would have 
blocked.
      */
-    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker, int packetNo, AsyncMessageCallback[] callbacks) throws 
KeyChangedException, NotConnectedException, PacketSequenceException {
+    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker, int packetNo, AsyncMessageCallback[] callbacks) throws 
KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
         byte[] newBuf = preformat(buf, offset, length);
         processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, 
packetNo, callbacks);
     }
@@ -973,8 +976,9 @@
      * Send a packet using the current key. Retry if it fails solely because
      * the key changes.
      * @throws PacketSequenceException 
+     * @throws WouldBlockException 
      */
-    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, int k, AsyncMessageCallback[] callbacks) throws 
NotConnectedException, PacketSequenceException {
+    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, int k, AsyncMessageCallback[] callbacks) throws 
NotConnectedException, PacketSequenceException, WouldBlockException {
         while(true) {
             try {
                Logger.minor(this, "At beginning of processOutgoingPreformatted 
loop for "+peer.getPeer());
@@ -1049,8 +1053,9 @@
      * @throws NotConnectedException If the node is not connected.
      * @throws KeyChangedException If the primary key changes while we are 
trying to send this packet.
      * @throws PacketSequenceException 
+     * @throws WouldBlockException If we cannot allocate a packet number 
because it would block.
      */
-    public synchronized void processOutgoingPreformatted(byte[] buf, int 
offset, int length, KeyTracker tracker, int packetNumber, 
AsyncMessageCallback[] callbacks) throws KeyChangedException, 
NotConnectedException, PacketSequenceException {
+    public void processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks) 
throws KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
         if(Logger.shouldLog(Logger.MINOR, this)) {
             String log = 
"processOutgoingPreformatted("+Fields.hashCode(buf)+", 
"+offset+","+length+","+tracker+","+packetNumber+",";
             if(callbacks == null) log += "null";
@@ -1084,7 +1089,7 @@
                                // Ack/resendreq only packet
                                seqNumber = -1;
                        else
-                               seqNumber = 
tracker.allocateOutgoingPacketNumber();
+                               seqNumber = 
tracker.allocateOutgoingPacketNumberNeverBlock();
                }

                Logger.minor(this, "Sequence number (sending): "+seqNumber+" 
("+packetNumber+") to "+tracker.pn.getPeer());

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-12-06 14:23:03 UTC (rev 
7680)
+++ trunk/freenet/src/freenet/node/Node.java    2005-12-06 16:19:14 UTC (rev 
7681)
@@ -97,10 +97,16 @@
     public static final int RANDOMIZED_TIME_BETWEEN_VERSION_PROBES = 
HANDSHAKE_TIMEOUT*2; // 20-30 secs
     // If we don't receive any packets at all in this period, from any node, 
tell the user
     public static final long ALARM_TIME = 60*1000;
+    /** Sub-max ping time. If ping is greater than this, we reject some 
requests. */
+    public static final long SUB_MAX_PING_TIME = 1000;
     /** Maximum overall average ping time. If ping is greater than this,
      * we reject all requests.
      */
-    public static final long MAX_PING_TIME = 1000;
+    public static final long MAX_PING_TIME = 2000;
+    /** Accept one request every 10 seconds regardless, to ensure we update the
+     * block send time.
+     */
+    public static final int MAX_INTERREQUEST_TIME = 10*1000;

     // 900ms
     static final int MIN_INTERVAL_BETWEEN_INCOMING_SWAP_REQUESTS = 900;
@@ -121,9 +127,6 @@
     /** IP address detector */
     private final IPAddressDetector ipDetector;

-    /** Locally published stream contexts */
-    private final Hashtable localStreamContexts;
-    
     private final HashSet runningUIDs;

     byte[] myIdentity; // FIXME: simple identity block; should be unique
@@ -375,7 +378,6 @@
         decrementAtMax = random.nextDouble() <= DECREMENT_AT_MAX_PROB;
         decrementAtMin = random.nextDouble() <= DECREMENT_AT_MIN_PROB;
         bootID = random.nextLong();
-        localStreamContexts = new Hashtable();
         peers.writePeers();
         try {
                String dirName = "temp-"+portNumber;
@@ -610,8 +612,25 @@
         }
     }

-    public boolean shouldRejectRequest() {
-       return nodePinger.averagePingTime() > MAX_PING_TIME;
+    long lastAcceptedRequest = -1;
+    
+    public synchronized boolean shouldRejectRequest() {
+       long now = System.currentTimeMillis();
+       double pingTime = nodePinger.averagePingTime();
+       if(pingTime > MAX_PING_TIME) {
+               if(now - lastAcceptedRequest > MAX_INTERREQUEST_TIME) {
+                       lastAcceptedRequest = now;
+                       return false;
+               }
+               return true;
+       }
+       if(pingTime > SUB_MAX_PING_TIME) {
+               double x = (pingTime - SUB_MAX_PING_TIME) / (MAX_PING_TIME - 
SUB_MAX_PING_TIME);
+               if(random.nextDouble() < x)
+                       return true;
+       }
+       lastAcceptedRequest = now;
+       return false;
     }

     /**

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2005-12-06 14:23:03 UTC 
(rev 7680)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2005-12-06 16:19:14 UTC 
(rev 7681)
@@ -4,6 +4,7 @@

 import freenet.io.comm.NotConnectedException;
 import freenet.support.Logger;
+import freenet.support.WouldBlockException;

 /**
  * @author amphibian
@@ -19,6 +20,7 @@
     final Node node;
     long lastClearedOldSwapChains;
     long lastReportedNoPackets;
+    long lastReceivedPacketFromAnyNode;

     PacketSender(Node node) {
         resendPackets = new LinkedList();
@@ -33,117 +35,135 @@

     public void run() {
         while(true) {
-            long lastReceivedPacketFromAnyNode = lastReportedNoPackets;
+            lastReceivedPacketFromAnyNode = lastReportedNoPackets;
             try {
-                long now = System.currentTimeMillis();
-                PeerManager pm = node.peers;
-                PeerNode[] nodes = pm.myPeers;
-                long nextActionTime = Long.MAX_VALUE;
-                for(int i=0;i<nodes.length;i++) {
-                    PeerNode pn = nodes[i];
-                    lastReceivedPacketFromAnyNode =
-                        Math.max(pn.lastReceivedPacketTime(), 
lastReceivedPacketFromAnyNode);
-                    if(pn.isConnected()) {
-                        // Is the node dead?
-                        if(now - pn.lastReceivedPacketTime() > 
pn.maxTimeBetweenReceivedPackets()) {
-                            pn.disconnected();
-                            continue;
-                        }
-                        
-                        // Any urgent notifications to send?
-                        long urgentTime = pn.getNextUrgentTime();
-                        if(urgentTime <= now) {
-                            // Send them
-                            pn.sendAnyUrgentNotifications();
-                        } else {
-                            nextActionTime = Math.min(nextActionTime, 
urgentTime);
-                        }
-                        
-                        // Any packets to resend?
-                        for(int j=0;j<2;j++) {
-                            KeyTracker kt;
-                            if(j == 0) kt = pn.getCurrentKeyTracker();
-                            else if(j == 1) kt = pn.getPreviousKeyTracker();
-                            else break; // impossible
-                            if(kt == null) continue;
-                            ResendPacketItem[] resendItems = 
kt.grabResendPackets();
-                            if(resendItems == null) continue;
-                            for(int k=0;k<resendItems.length;k++) {
-                                ResendPacketItem item = resendItems[k];
-                                if(item == null) continue;
-                                try {
-                                    Logger.minor(this, "Resending 
"+item.packetNumber+" to "+item.kt);
-                                    
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks);
-                                } catch (KeyChangedException e) {
-                                    Logger.error(this, "Caught "+e+" resending 
packets to "+kt);
-                                    pn.requeueResendItems(resendItems);
-                                    break;
-                                } catch (NotConnectedException e) {
-                                    Logger.normal(this, "Caught "+e+" 
resending packets to "+kt);
-                                    pn.requeueResendItems(resendItems);
-                                    break;
-                                }
-                            }
-                            
-                        }
+                realRun();
+            } catch (Throwable t) {
+                Logger.error(this, "Caught in PacketSender: "+t, t);
+            }
+        }
+    }

-                        if(node.packetMangler == null) continue;
-                        // Any messages to send?
-                        MessageItem[] messages = null;
-                        messages = pn.grabQueuedMessageItems();
-                        if(messages != null) {
-                            // Send packets, right now, blocking, including 
any active notifications
-                            
node.packetMangler.processOutgoingOrRequeue(messages, pn, true);
-                            continue;
-                        }
-                        
-                        // Need to send a keepalive packet?
-                        if(now - pn.lastSentPacketTime() > 
Node.KEEPALIVE_INTERVAL) {
-                            Logger.minor(this, "Sending keepalive");
-                            node.packetMangler.processOutgoing(null, 0, 0, pn);
-                        }
-                    } else {
-                        // Not connected
-                        // Send handshake if necessary
-                        if(pn.shouldSendHandshake())
-                            node.packetMangler.sendHandshake(pn);
-                    }
-               }
-               
-                if(now - lastClearedOldSwapChains > 10000) {
-                    node.lm.clearOldSwapChains();
-                    lastClearedOldSwapChains = now;
+    private void realRun() {
+        long now = System.currentTimeMillis();
+        PeerManager pm = node.peers;
+        PeerNode[] nodes = pm.myPeers;
+        long nextActionTime = Long.MAX_VALUE;
+        for(int i=0;i<nodes.length;i++) {
+            PeerNode pn = nodes[i];
+            lastReceivedPacketFromAnyNode =
+                Math.max(pn.lastReceivedPacketTime(), 
lastReceivedPacketFromAnyNode);
+            if(pn.isConnected()) {
+                // Is the node dead?
+                if(now - pn.lastReceivedPacketTime() > 
pn.maxTimeBetweenReceivedPackets()) {
+                    pn.disconnected();
+                    continue;
                 }
-                // Send may have taken some time
-                now = System.currentTimeMillis();
-                long sleepTime = nextActionTime - now;
-                // 200ms maximum sleep time
-                sleepTime = Math.min(sleepTime, 200);

-                if(now - node.startupTime > 60*1000*5) {
-                    if(now - lastReceivedPacketFromAnyNode > Node.ALARM_TIME) {
-                        Logger.error(this, "Have not received any packets from 
any node in last "+Node.ALARM_TIME/1000+" seconds");
-                        lastReportedNoPackets = now;
+                // Any urgent notifications to send?
+                long urgentTime = pn.getNextUrgentTime();
+                if(urgentTime <= now) {
+                    // Send them
+                    pn.sendAnyUrgentNotifications();
+                } else {
+                    nextActionTime = Math.min(nextActionTime, urgentTime);
+                }
+                
+                // Any packets to resend?
+                for(int j=0;j<2;j++) {
+                    KeyTracker kt;
+                    if(j == 0) kt = pn.getCurrentKeyTracker();
+                    else if(j == 1) kt = pn.getPreviousKeyTracker();
+                    else break; // impossible
+                    if(kt == null) continue;
+                    ResendPacketItem[] resendItems = kt.grabResendPackets();
+                    if(resendItems == null) continue;
+                    for(int k=0;k<resendItems.length;k++) {
+                        ResendPacketItem item = resendItems[k];
+                        if(item == null) continue;
+                        try {
+                            Logger.minor(this, "Resending 
"+item.packetNumber+" to "+item.kt);
+                            
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks);
+                        } catch (KeyChangedException e) {
+                            Logger.error(this, "Caught "+e+" resending packets 
to "+kt);
+                            pn.requeueResendItems(resendItems);
+                            break;
+                        } catch (NotConnectedException e) {
+                            Logger.normal(this, "Caught "+e+" resending 
packets to "+kt);
+                            pn.requeueResendItems(resendItems);
+                            break;
+                        } catch (PacketSequenceException e) {
+                               Logger.error(this, "Caught "+e+" - 
disconnecting", e);
+                               pn.forceDisconnect();
+                                               } catch (WouldBlockException e) 
{
+                                                       Logger.error(this, 
"Impossible: "+e, e);
+                                               }
                     }
+                    
                 }
+
+                if(node.packetMangler == null) continue;
+                // Any messages to send?
+                MessageItem[] messages = null;
+                messages = pn.grabQueuedMessageItems();
+                if(messages != null) {
+                    // Send packets, right now, blocking, including any active 
notifications
+                    node.packetMangler.processOutgoingOrRequeue(messages, pn, 
true);
+                    continue;
+                }

-                if(sleepTime > 0) {
+                // Need to send a keepalive packet?
+                if(now - pn.lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
+                    Logger.minor(this, "Sending keepalive");
                     try {
-                        synchronized(this) {
-                            wait(sleepTime);
-                        }
-                    } catch (InterruptedException e) {
-                        // Ignore, just wake up. Probably we got interrupt()ed
-                        // because a new packet came in.
-                    }
+                                               
node.packetMangler.processOutgoing(null, 0, 0, pn);
+                                       } catch (PacketSequenceException e) {
+                       Logger.error(this, "Caught "+e+" - disconnecting", e);
+                       pn.forceDisconnect();
+                                       } catch (WouldBlockException e) {
+                                               Logger.error(this, "Impossible: 
"+e, e);
+                                       } catch (NotConnectedException e) {
+                                               // Ignore: no point sending a 
keepalive now! :)
+                                       }
                 }
-            } catch (Throwable t) {
-                Logger.error(this, "Caught in PacketSender: "+t, t);
+            } else {
+                // Not connected
+                // Send handshake if necessary
+                if(pn.shouldSendHandshake())
+                    node.packetMangler.sendHandshake(pn);
             }
+       }
+       
+        if(now - lastClearedOldSwapChains > 10000) {
+            node.lm.clearOldSwapChains();
+            lastClearedOldSwapChains = now;
         }
-    }
+        // Send may have taken some time
+        now = System.currentTimeMillis();
+        long sleepTime = nextActionTime - now;
+        // 200ms maximum sleep time
+        sleepTime = Math.min(sleepTime, 200);
+        
+        if(now - node.startupTime > 60*1000*5) {
+            if(now - lastReceivedPacketFromAnyNode > Node.ALARM_TIME) {
+                Logger.error(this, "Have not received any packets from any 
node in last "+Node.ALARM_TIME/1000+" seconds");
+                lastReportedNoPackets = now;
+            }
+        }
+        
+        if(sleepTime > 0) {
+            try {
+                synchronized(this) {
+                    wait(sleepTime);
+                }
+            } catch (InterruptedException e) {
+                // Ignore, just wake up. Probably we got interrupt()ed
+                // because a new packet came in.
+            }
+        }
+       }

-    void queuedResendPacket() {
+       void queuedResendPacket() {
         // Wake up if needed
         synchronized(this) {
             notifyAll();

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-12-06 14:23:03 UTC 
(rev 7680)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-12-06 16:19:14 UTC 
(rev 7681)
@@ -30,6 +30,7 @@
 import freenet.support.LRUHashtable;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
+import freenet.support.WouldBlockException;
 import freenet.support.math.RunningAverage;
 import freenet.support.math.SimpleRunningAverage;

@@ -800,7 +801,7 @@
      * Send a payload-less packet on either key if necessary.
      * @throws PacketSequenceException 
      */
-    public void sendAnyUrgentNotifications() throws PacketSequenceException {
+    public void sendAnyUrgentNotifications() {
         Logger.minor(this, "sendAnyUrgentNotifications");
         long now = System.currentTimeMillis();
         KeyTracker cur, prev;
@@ -818,7 +819,12 @@
                     // Ignore
                 } catch (KeyChangedException e) {
                     // Ignore
-                }
+                } catch (PacketSequenceException e) {
+                       Logger.error(this, "Impossible: "+e, e);
+                               } catch (WouldBlockException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
             }
         }
         tracker = prev;
@@ -831,7 +837,11 @@
                     // Ignore
                 } catch (KeyChangedException e) {
                     // Ignore
-                }
+                               } catch (WouldBlockException e) {
+                                       Logger.error(this, "Impossible: "+e, e);
+                               } catch (PacketSequenceException e) {
+                                       Logger.error(this, "Impossible: "+e, e);
+                               }
             }
         }
     }
@@ -1047,5 +1057,6 @@

        public void reportThrottledPacketSendTime(long timeDiff) {
                throttledPacketSendAverage.report(timeDiff);
+               Logger.minor(this, "Reporting throttled packet send time: 
"+timeDiff+" to "+getPeer());
        }
 }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-12-06 14:23:03 UTC (rev 
7680)
+++ trunk/freenet/src/freenet/node/Version.java 2005-12-06 16:19:14 UTC (rev 
7681)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 278;
+       public static final int buildNumber = 280;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 278;
+       public static final int lastGoodBuild = 280;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;

Modified: trunk/freenet/src/freenet/support/WouldBlockException.java
===================================================================
--- trunk/freenet/src/freenet/support/WouldBlockException.java  2005-12-06 
14:23:03 UTC (rev 7680)
+++ trunk/freenet/src/freenet/support/WouldBlockException.java  2005-12-06 
16:19:14 UTC (rev 7681)
@@ -1,8 +1,18 @@
 package freenet.support;

+import freenet.io.comm.LowLevelFilterException;
+
 /**
  * Thrown when we would have to block but have been told not to.
  */
-public class WouldBlockException extends Exception {
+public class WouldBlockException extends LowLevelFilterException {
+       public WouldBlockException(String string) {
+               super(string);
+       }
+
+       public WouldBlockException() {
+               super();
+       }
+
        static final long serialVersionUID = -1;
 }


Reply via email to