Author: robert
Date: 2008-01-07 20:07:30 +0000 (Mon, 07 Jan 2008)
New Revision: 16959
Modified:
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
implement conditionalSend: sendSync-with-timeout, aborts message send
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2008-01-07 19:31:24 UTC
(rev 16958)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2008-01-07 20:07:30 UTC
(rev 16959)
@@ -72,4 +72,8 @@
}
}
}
+
+ public boolean isForMessage(Message msg) {
+ return this.msg.equals(msg);
+ }
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-01-07 19:31:24 UTC
(rev 16958)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-01-07 20:07:30 UTC
(rev 16959)
@@ -16,6 +16,7 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.ListIterator;
import java.util.Vector;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@@ -968,6 +969,21 @@
// It will wake up before the maximum coalescing delay (100ms)
because
// it wakes up every 100ms *anyway*.
}
+
+ private boolean maybeRemoveMessageFromQueue(Message removeMe) {
+ Logger.normal(this, "attempting to remove message from
send-queue: "+removeMe);
+ synchronized (messagesToSendNow) {
+ ListIterator i=messagesToSendNow.listIterator();
+ while (i.hasNext()) {
+ MessageItem it=(MessageItem)i.next();
+ if (it.isForMessage(removeMe)) {
+ i.remove();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
public long getMessageQueueLengthBytes() {
long x = 0;
@@ -1396,6 +1412,36 @@
}
/**
+ * Conceptually, send a message to this node IF it can be done within
'timeout', returing true
+ * only after the message was sent (similiar to sendSync), and false if
the message cannot be
+ * sent to the node in that time period. As an optimization, however,
this function may return
+ * immediately if it is determined that the message would not leave the
node within the timeout
+ * period.
+ */
+ public boolean conditionalSend(Message req, ByteCounter ctr, long
timeout) throws NotConnectedException {
+ if (timeout<=0)
+ return false;
+ if
(getMessageQueueLengthBytes()/(getThrottle().getBandwidth()+1.0) > timeout) {
+ Logger.normal(this, "conditionalSend; pre-emptively not
sending message ("+timeout+"ms): "+req);
+ return false;
+ }
+ SyncMessageCallback cb = new SyncMessageCallback();
+ sendAsync(req, cb, 0, ctr);
+ cb.waitForSend(timeout);
+ if (cb.done) {
+ return true;
+ } else {
+ //best-effort: remove the message from the send queue
it is ok if we can't prematurely
+ //remove the item (i.e. race condition / now it is
sent), but it will generate unclaimed messages, etc.
+ if (!maybeRemoveMessageFromQueue(req))
+ Logger.error(this, "unable to stop transmition
of request: "+req);
+ else
+ Logger.normal(this, "removed request from queue
for timeout: "+req);
+ return false;
+ }
+ }
+
+ /**
* Enqueue a message to be sent to this node and wait up to a minute for
it to be transmitted.
*/
public void sendSync(Message req, ByteCounter ctr) throws
NotConnectedException {
@@ -1426,7 +1472,7 @@
}
}
if(isConnected())
- Logger.error(this, "Waited too long for a
blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
+ Logger.normal(this, "Waited too long for a
blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
}
public void acknowledged() {
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 19:31:24 UTC
(rev 16958)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 20:07:30 UTC
(rev 16959)
@@ -4,6 +4,7 @@
package freenet.node;
import java.util.HashSet;
+import java.util.ArrayList;
import freenet.crypt.CryptFormatException;
import freenet.crypt.DSAPublicKey;
@@ -46,6 +47,8 @@
public final class RequestSender implements Runnable, ByteCounter {
// Constants
+ //SEND_TIMEOUT is not a hard timeout, shoot low for low latency
(250-500ms?).
+ static final int SEND_TIMEOUT = 1000;
static final int ACCEPTED_TIMEOUT = 5000;
static final int FETCH_TIMEOUT = 120000;
/** Wait up to this long to get a path folding reply */
@@ -141,6 +144,7 @@
int rejectOverloads=0;
HashSet nodesRoutedTo = new HashSet();
HashSet nodesNotIgnored = new HashSet();
+ ArrayList busyPeers = new ArrayList();
while(true) {
if(logMINOR) Logger.minor(this, "htl="+htl);
if(htl == 0) {
@@ -159,9 +163,24 @@
routeAttempts++;
// Route it
+ long sendTimeout = SEND_TIMEOUT;
+ boolean usingBusyPeer=false;
PeerNode next;
next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
+ if (next == null && !busyPeers.isEmpty()) {
+ next = (PeerNode)busyPeers.remove(0);
+ usingBusyPeer=true;
+ if (logMINOR) Logger.minor(this, "trying
previously-found busy peer: "+next);
+ //NOTE: if we are at this point, it is already
presumed that the message cannot even make it off the node to this peer in
SEND_TIMEOUT, use all the timeout we have left.
+ sendTimeout =
FETCH_TIMEOUT-(System.currentTimeMillis()-startTime);
+ //Edge case, local request & we are running w/o
any time left.
+ if (sendTimeout < SEND_TIMEOUT && source==null)
{
+ if (logMINOR) Logger.minor(this,
"increasing timeout for local request");
+ sendTimeout = 2*SEND_TIMEOUT;
+ }
+ }
+
if(next == null) {
if (logMINOR && rejectOverloads>0)
Logger.minor(this, "no more peers, but
overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
@@ -187,11 +206,18 @@
// So take it from when we first started to try to send the
request.
// See comments below when handling FNPRecentlyFailed for why we
need this.
long timeSentRequest = System.currentTimeMillis();
-
+
try {
//This is the first contact to this node
//async is preferred, but makes ACCEPTED_TIMEOUT much more
likely for long send queues.
- next.sendAsync(req, null, 0, this);
+ //using conditionalSend this way might actually
approximate Q-routing load balancing accross the network.
+ if (!next.conditionalSend(req, this, sendTimeout)) {
+ if (usingBusyPeer)
+ continue;
+ Logger.normal(this, "will try this peer
later if no others are available");
+ busyPeers.add(next);
+ continue;
+ }
} catch (NotConnectedException e) {
Logger.minor(this, "Not connected");
continue;