Author: robert
Date: 2008-01-07 20:07:30 +0000 (Mon, 07 Jan 2008)
New Revision: 16959

Modified:
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
implement conditionalSend: sendSync-with-timeout, aborts message send


Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2008-01-07 19:31:24 UTC 
(rev 16958)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2008-01-07 20:07:30 UTC 
(rev 16959)
@@ -72,4 +72,8 @@
                        }
                }
        }
+       
+       public boolean isForMessage(Message msg) {
+               return this.msg.equals(msg);
+       }
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-01-07 19:31:24 UTC 
(rev 16958)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-01-07 20:07:30 UTC 
(rev 16959)
@@ -16,6 +16,7 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Vector;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
@@ -968,6 +969,21 @@
                // It will wake up before the maximum coalescing delay (100ms) 
because
                // it wakes up every 100ms *anyway*.
        }
+       
+       private boolean maybeRemoveMessageFromQueue(Message removeMe) {
+               Logger.normal(this, "attempting to remove message from 
send-queue: "+removeMe);
+               synchronized (messagesToSendNow) {
+                       ListIterator i=messagesToSendNow.listIterator();
+                       while (i.hasNext()) {
+                               MessageItem it=(MessageItem)i.next();
+                               if (it.isForMessage(removeMe)) {
+                                       i.remove();
+                                       return true;
+                               }
+                       }
+               }
+               return false;
+       }

        public long getMessageQueueLengthBytes() {
                long x = 0;
@@ -1396,6 +1412,36 @@
        }

        /**
+        * Conceptually, send a message to this node IF it can be done within 
'timeout', returing true
+        * only after the message was sent (similiar to sendSync), and false if 
the message cannot be
+        * sent to the node in that time period. As an optimization, however, 
this function may return
+        * immediately if it is determined that the message would not leave the 
node within the timeout
+        * period.
+        */
+       public boolean conditionalSend(Message req, ByteCounter ctr, long 
timeout) throws NotConnectedException {
+               if (timeout<=0)
+                       return false;
+               if 
(getMessageQueueLengthBytes()/(getThrottle().getBandwidth()+1.0) > timeout) {
+                       Logger.normal(this, "conditionalSend; pre-emptively not 
sending message ("+timeout+"ms): "+req);
+                       return false;
+               }
+               SyncMessageCallback cb = new SyncMessageCallback();
+               sendAsync(req, cb, 0, ctr);
+               cb.waitForSend(timeout);
+               if (cb.done) {
+                       return true;
+               } else {
+                       //best-effort: remove the message from the send queue 
it is ok if we can't prematurely
+                       //remove the item (i.e. race condition / now it is 
sent), but it will generate unclaimed messages, etc.
+                       if (!maybeRemoveMessageFromQueue(req))
+                               Logger.error(this, "unable to stop transmition 
of request: "+req);
+                       else
+                               Logger.normal(this, "removed request from queue 
for timeout: "+req);
+                       return false;
+               }
+       }
+       
+       /**
        * Enqueue a message to be sent to this node and wait up to a minute for 
it to be transmitted.
        */
        public void sendSync(Message req, ByteCounter ctr) throws 
NotConnectedException {
@@ -1426,7 +1472,7 @@
                                }
                        }
                        if(isConnected())
-                               Logger.error(this, "Waited too long for a 
blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
+                               Logger.normal(this, "Waited too long for a 
blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
                }

                public void acknowledged() {

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2008-01-07 19:31:24 UTC 
(rev 16958)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2008-01-07 20:07:30 UTC 
(rev 16959)
@@ -4,6 +4,7 @@
 package freenet.node;

 import java.util.HashSet;
+import java.util.ArrayList;

 import freenet.crypt.CryptFormatException;
 import freenet.crypt.DSAPublicKey;
@@ -46,6 +47,8 @@
 public final class RequestSender implements Runnable, ByteCounter {

     // Constants
+       //SEND_TIMEOUT is not a hard timeout, shoot low for low latency 
(250-500ms?).
+       static final int SEND_TIMEOUT = 1000;
     static final int ACCEPTED_TIMEOUT = 5000;
     static final int FETCH_TIMEOUT = 120000;
     /** Wait up to this long to get a path folding reply */
@@ -141,6 +144,7 @@
                int rejectOverloads=0;
         HashSet nodesRoutedTo = new HashSet();
         HashSet nodesNotIgnored = new HashSet();
+               ArrayList busyPeers = new ArrayList();
         while(true) {
             if(logMINOR) Logger.minor(this, "htl="+htl);
             if(htl == 0) {
@@ -159,9 +163,24 @@
                        routeAttempts++;

             // Route it
+                       long sendTimeout = SEND_TIMEOUT;
+                       boolean usingBusyPeer=false;
             PeerNode next;
             next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);

+                       if (next == null && !busyPeers.isEmpty()) {
+                               next = (PeerNode)busyPeers.remove(0);
+                               usingBusyPeer=true;
+                               if (logMINOR) Logger.minor(this, "trying 
previously-found busy peer: "+next);
+                               //NOTE: if we are at this point, it is already 
presumed that the message cannot even make it off the node to this peer in 
SEND_TIMEOUT, use all the timeout we have left.
+                               sendTimeout = 
FETCH_TIMEOUT-(System.currentTimeMillis()-startTime);
+                               //Edge case, local request & we are running w/o 
any time left.
+                               if (sendTimeout < SEND_TIMEOUT && source==null) 
{
+                                       if (logMINOR) Logger.minor(this, 
"increasing timeout for local request");
+                                       sendTimeout = 2*SEND_TIMEOUT;
+                               }
+                       }
+                       
             if(next == null) {
                                if (logMINOR && rejectOverloads>0)
                                        Logger.minor(this, "no more peers, but 
overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
@@ -187,11 +206,18 @@
             // So take it from when we first started to try to send the 
request.
             // See comments below when handling FNPRecentlyFailed for why we 
need this.
             long timeSentRequest = System.currentTimeMillis();
-            
+                       
             try {
                //This is the first contact to this node
                //async is preferred, but makes ACCEPTED_TIMEOUT much more 
likely for long send queues.
-               next.sendAsync(req, null, 0, this);
+                               //using conditionalSend this way might actually 
approximate Q-routing load balancing accross the network.
+               if (!next.conditionalSend(req, this, sendTimeout)) {
+                                       if (usingBusyPeer)
+                                               continue;
+                                       Logger.normal(this, "will try this peer 
later if no others are available");
+                                       busyPeers.add(next);
+                                       continue;
+                               }
             } catch (NotConnectedException e) {
                Logger.minor(this, "Not connected");
                continue;


Reply via email to