Author: toad
Date: 2005-11-29 18:18:12 +0000 (Tue, 29 Nov 2005)
New Revision: 7631
Added:
trunk/freenet/src/freenet/node/NodePinger.java
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/InsertSender.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/LRUHashtable.java
Log:
245: (mandatory)
New load balancing and limiting, very simple, based on analogy to
tcp-over-ethernet. Simple randomized exponential backoff, and the existing
TCP-like load limiting. Thanks ian.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2005-11-29 00:36:32 UTC (rev
7630)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2005-11-29 18:18:12 UTC (rev
7631)
@@ -78,6 +78,7 @@
public static final String BLOCK_HEADERS = "blockHeaders";
public static final String DATA_INSERT_REJECTED_REASON =
"dataInsertRejectedReason";
public static final String STREAM_SEQNO = "streamSequenceNumber";
+ public static final String IS_LOCAL = "isLocal";
//Diagnostic
public static final MessageType ping = new MessageType("ping") {{
@@ -528,14 +529,16 @@
// to source, and reduce send rate.
public static final MessageType FNPRejectedOverload = new
MessageType("FNPRejectOverload") {{
addField(UID, Long.class);
+ addField(IS_LOCAL, Boolean.class);
}};
- public static final Message createFNPRejectedOverload(long id) {
+ public static final Message createFNPRejectedOverload(long id, boolean
isLocal) {
Message msg = new Message(FNPRejectedOverload);
msg.set(UID, id);
+ msg.set(IS_LOCAL, isLocal);
return msg;
}
-
+
public static final MessageType FNPAccepted = new
MessageType("FNPAccepted") {{
addField(UID, Long.class);
}};
@@ -661,6 +664,26 @@
return msg;
}
+ public static final MessageType FNPLinkPing = new
MessageType("FNPLinkPing") {{
+ addField(PING_SEQNO, Long.class);
+ }};
+
+ public static final Message createFNPLinkPing(long seqNo) {
+ Message msg = new Message(FNPLinkPing);
+ msg.set(PING_SEQNO, seqNo);
+ return msg;
+ }
+
+ public static final MessageType FNPLinkPong = new
MessageType("FNPLinkPong") {{
+ addField(PING_SEQNO, Long.class);
+ }};
+
+ public static final Message createFNPLinkPong(long seqNo) {
+ Message msg = new Message(FNPLinkPong);
+ msg.set(PING_SEQNO, seqNo);
+ return msg;
+ }
+
public static final MessageType FNPPong = new MessageType("FNPPong") {{
addField(PING_SEQNO, Integer.class);
}};
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-29
00:36:32 UTC (rev 7630)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-29
18:18:12 UTC (rev 7631)
@@ -258,4 +258,8 @@
public boolean failedDueToOverload() {
return failedByOverload;
}
+
+ public PeerContext getDestination() {
+ return _destination;
+ }
}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -139,6 +139,8 @@
// also have a flag locally to indicate the receive failed.
// And if it does, we interrupt.
+ boolean receivedRejectedOverload = false;
+
while(true) {
synchronized(sender) {
try {
@@ -154,20 +156,29 @@
return;
}
+ if((!receivedRejectedOverload) &&
sender.receivedRejectedOverload()) {
+ // Forward it
+ Message m = DMT.createFNPRejectedOverload(uid, false);
+ source.send(m);
+ }
+
int status = sender.getStatus();
if(status == InsertSender.NOT_FINISHED) {
continue;
}
-
+
+ // Local RejectedOverload's (fatal).
// Internal error counts as overload. It'd only create a timeout
otherwise, which is the same thing anyway.
// We *really* need a good way to deal with nodes that constantly
R_O!
- if(status == InsertSender.REJECTED_OVERLOAD ||
+ if(status == InsertSender.TIMED_OUT ||
+ status == InsertSender.GENERATED_REJECTED_OVERLOAD ||
status == InsertSender.INTERNAL_ERROR) {
- msg = DMT.createFNPRejectedOverload(uid);
+ msg = DMT.createFNPRejectedOverload(uid, true);
source.send(msg);
// Might as well store it anyway.
- if(status == InsertSender.REJECTED_OVERLOAD)
+ if(status == InsertSender.TIMED_OUT ||
+ status ==
InsertSender.GENERATED_REJECTED_OVERLOAD)
canCommit = true;
return;
}
Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/InsertSender.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -74,11 +74,18 @@
private boolean sentRequest;
private int status = -1;
+ /** Still running */
static final int NOT_FINISHED = -1;
+ /** Successful insert */
static final int SUCCESS = 0;
+ /** Route not found */
static final int ROUTE_NOT_FOUND = 1;
- static final int REJECTED_OVERLOAD = 2;
+ /** Internal error */
static final int INTERNAL_ERROR = 3;
+ /** Timed out waiting for response */
+ static final int TIMED_OUT = 4;
+ /** Locally Generated a RejectedOverload */
+ static final int GENERATED_REJECTED_OVERLOAD = 5;
public String toString() {
return super.toString()+" for "+uid;
@@ -145,29 +152,66 @@
if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
Message msg;
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from "+next+" while waiting
for Accepted");
- continue;
- }
- if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
- if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
- // Overload... hmmmm - propagate error back to source
- Logger.error(this, "Propagating "+msg+" back to source on
"+this);
- next.insertRejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
- }
+ /*
+ * Because messages may be re-ordered, it is
+ * entirely possible that we get a non-local RejectedOverload,
+ * followed by an Accepted. So we must loop here.
+ */
- if(msg.getSpec() == DMT.FNPRejectedLoop) {
- next.insertDidNotRejectOverload();
- // Loop - we don't want to send the data to this one
- continue;
- }
+ while (true) {
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
Accepted");
+ continue;
+ }
+
+ if (receiveFailed)
+ return; // don't need to set status as
killed by InsertHandler
+
+ if (msg == null) {
+ // Terminal overload
+ // Try to propagate back to source
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal - probably still have time
left
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger
+ .minor(this,
+
"Local RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedLoop) {
+ next.successNotOverload();
+ // Loop - we don't want to send the
data to this one
+ break;
+ }
+
+ if (msg.getSpec() != DMT.FNPAccepted) {
+ Logger.error(this,
+ "Unexpected message
waiting for Accepted: "
+ + msg);
+ break;
+ }
+ // Otherwise is an FNPAccepted
+ break;
+ }
- // Otherwise must be an Accepted
+ if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
// Send them the data.
// Which might be the new data resulting from a collision...
@@ -208,95 +252,118 @@
senderThread.start();
senderThreads.add(senderThread);
blockSenders.add(bt);
-
- if(receiveFailed) return;
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from "+next+" while waiting
for InsertReply on "+this);
- continue;
- }
- if(receiveFailed) return;
-
- if(msg == null) {
- // Timeout :(
- // Fairly serious problem
- Logger.error(this, "Timeout after Accepted in insert");
- // Treat as rejected-overload
- next.insertRejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
- }
-
- if(msg.getSpec() == DMT.FNPRejectedOverload || msg.getSpec() ==
DMT.FNPRejectedTimeout) {
- Logger.minor(this, "Rejected due to overload");
- next.insertRejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
- }
-
- if(msg.getSpec() == DMT.FNPRouteNotFound) {
- Logger.minor(this, "Rejected: RNF");
- short newHtl = msg.getShort(DMT.HTL);
- if(htl > newHtl) htl = newHtl;
- // Finished as far as this node is concerned
- next.insertDidNotRejectOverload();
- continue;
- }
-
- if(msg.getSpec() == DMT.FNPDataInsertRejected) {
- next.insertDidNotRejectOverload();
- short reason = msg.getShort(DMT.DATA_INSERT_REJECTED_REASON);
- Logger.minor(this, "DataInsertRejected: "+reason);
-
- if(reason == DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
- if(fromStore) {
- // That's odd...
- Logger.error(this, "Verify failed on next node
"+next+" for DataInsert but we were sending from the store!");
- } else {
- try {
- if(!prb.allReceived())
- Logger.error(this, "Did not receive all
packets but next node says invalid anyway!");
- else {
- // Check the data
- new CHKBlock(prb.getBlock(), headers, myKey);
- Logger.error(this, "Verify failed on "+next+"
but data was valid!");
- }
- } catch (CHKVerifyException e) {
- Logger.normal(this, "Verify failed because data
was invalid");
- }
- }
- continue; // What else can we do?
- } else if(reason == DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
- if(receiveFailed) {
- Logger.minor(this, "Failed to receive data, so failed
to send data");
- } else {
- if(prb.allReceived()) {
- Logger.error(this, "Received all data but send
failed to "+next);
- } else {
- if(prb.isAborted()) {
- Logger.normal(this, "Send failed: aborted:
"+prb.getAbortReason()+": "+prb.getAbortDescription());
- } else
- Logger.normal(this, "Send failed; have not yet
received all data but not aborted: "+next);
- }
- }
- continue;
- }
-
- Logger.error(this, "DataInsert rejected!
Reason="+DMT.getDataInsertRejectedReason(reason));
-
- }
-
- if(msg.getSpec() != DMT.FNPInsertReply) {
- Logger.error(this, "Unknown reply: "+msg);
- finish(INTERNAL_ERROR, next);
- }
-
- // Our task is complete
- next.insertDidNotRejectOverload();
- finish(SUCCESS, next);
- return;
- }
+
+ while (true) {
+
+ if (receiveFailed)
+ return;
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
InsertReply on " + this);
+ break;
+ }
+ if (receiveFailed)
+ return;
+
+ if (msg == null || msg.getSpec() ==
DMT.FNPRejectedTimeout) {
+ // Timeout :(
+ // Fairly serious problem
+ Logger.error(this, "Timeout (" + msg
+ + ") after Accepted in
insert");
+ // Terminal overload
+ // Try to propagate back to source
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Probably non-fatal, if so, we have
time left, can try next one
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this,
+ "Local
RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue; // Wait for any further
response
+ }
+
+ if (msg.getSpec() == DMT.FNPRouteNotFound) {
+ Logger.minor(this, "Rejected: RNF");
+ short newHtl = msg.getShort(DMT.HTL);
+ if (htl > newHtl)
+ htl = newHtl;
+ // Finished as far as this node is
concerned
+ next.successNotOverload();
+ break;
+ }
+
+ if (msg.getSpec() == DMT.FNPDataInsertRejected)
{
+ next.successNotOverload();
+ short reason = msg
+
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
+ Logger.minor(this, "DataInsertRejected:
" + reason);
+ if (reason ==
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
+ if (fromStore) {
+ // That's odd...
+
Logger.error(this,"Verify failed on next node "
+ + next
+ " for DataInsert but we were sending from the store!");
+ } else {
+ try {
+ if
(!prb.allReceived())
+
Logger.error(this,
+
"Did not receive all packets but next node says invalid anyway!");
+ else {
+ //
Check the data
+ new
CHKBlock(prb.getBlock(), headers,
+
myKey);
+
Logger.error(this,
+
"Verify failed on " + next
+
+ " but data was valid!");
+ }
+ } catch
(CHKVerifyException e) {
+ Logger
+
.normal(this,
+
"Verify failed because data was invalid");
+ }
+ }
+ break; // What else can we do?
+ } else if (reason ==
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
+ if (receiveFailed) {
+ Logger.minor(this,
"Failed to receive data, so failed to send data");
+ } else {
+ if (prb.allReceived()) {
+
Logger.error(this, "Received all data but send failed to " + next);
+ } else {
+ if
(prb.isAborted()) {
+
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " +
prb.getAbortDescription());
+ } else
+
Logger.normal(this, "Send failed; have not yet received all data but not
aborted: " + next);
+ }
+ }
+ break;
+ }
+ Logger.error(this, "DataInsert
rejected! Reason="
+ +
DMT.getDataInsertRejectedReason(reason));
+ }
+
+ if (msg.getSpec() != DMT.FNPInsertReply) {
+ Logger.error(this, "Unknown reply: " +
msg);
+ finish(INTERNAL_ERROR, next);
+ }
+
+ // Our task is complete
+ next.successNotOverload();
+ finish(SUCCESS, next);
+ return;
+ }
+ }
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
if(status == NOT_FINISHED)
@@ -306,21 +373,22 @@
node.removeInsertSender(myKey, origHTL, this);
}
}
-
- /**
- * Wait until we have a terminal status code.
- */
- public synchronized void waitUntilFinished() {
- while(true) {
- if(status != NOT_FINISHED) return;
- try {
- wait(10000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
+
+ private boolean hasForwardedRejectedOverload = false;
+
+ synchronized boolean receivedRejectedOverload() {
+ return hasForwardedRejectedOverload;
}
+ /** Forward RejectedOverload to the request originator.
+ * DO NOT CALL if have a *local* RejectedOverload.
+ */
+ private synchronized void forwardRejectedOverload() {
+ if(hasForwardedRejectedOverload) return;
+ hasForwardedRejectedOverload = true;
+ notifyAll();
+ }
+
private void finish(int code, PeerNode next) {
Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
if(status != NOT_FINISHED)
@@ -330,30 +398,13 @@
BlockTransmitter bt = (BlockTransmitter) i.next();
bt.waitForComplete();
if(bt.failedDueToOverload() && (status == SUCCESS || status ==
ROUTE_NOT_FOUND)) {
- status = REJECTED_OVERLOAD;
+ forwardRejectedOverload();
+ ((PeerNode)bt.getDestination()).localRejectedOverload();
break;
}
}
- for(Iterator i = senderThreads.iterator();i.hasNext();) {
- Thread senderThread = (Thread) i.next();
- while(senderThread.isAlive()) {
- try {
- senderThread.join();
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
-
status = code;
- if(sentRequest) {
- if(status == REJECTED_OVERLOAD) {
- node.getInsertThrottle().requestRejectedOverload();
- } else if(status == SUCCESS || status == ROUTE_NOT_FOUND) {
-
node.getInsertThrottle().requestCompleted(System.currentTimeMillis() -
startTime);
- }
- }
synchronized(this) {
notifyAll();
@@ -384,12 +435,18 @@
return "SUCCESS";
if(status == ROUTE_NOT_FOUND)
return "ROUTE NOT FOUND";
- if(status == REJECTED_OVERLOAD)
- return "REJECTED: OVERLOAD";
if(status == NOT_FINISHED)
return "NOT FINISHED";
if(status == INTERNAL_ERROR)
return "INTERNAL ERROR";
+ if(status == TIMED_OUT)
+ return "TIMED OUT";
+ if(status == GENERATED_REJECTED_OVERLOAD)
+ return "GENERATED REJECTED OVERLOAD";
return "UNKNOWN STATUS CODE: "+status;
}
+
+ public boolean sentRequest() {
+ return sentRequest;
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-11-29 00:36:32 UTC (rev
7630)
+++ trunk/freenet/src/freenet/node/Node.java 2005-11-29 18:18:12 UTC (rev
7631)
@@ -80,8 +80,8 @@
public static final boolean DONT_CACHE_LOCAL_REQUESTS = true;
public static final int PACKETS_IN_BLOCK = 32;
public static final int PACKET_SIZE = 1024;
- public static final double DECREMENT_AT_MIN_PROB = 0.2;
- public static final double DECREMENT_AT_MAX_PROB = 0.1;
+ public static final double DECREMENT_AT_MIN_PROB = 0.25;
+ public static final double DECREMENT_AT_MAX_PROB = 0.5;
// Send keepalives every 2.5-5.0 seconds
public static final int KEEPALIVE_INTERVAL = 2500;
// If no activity for 30 seconds, node is dead
@@ -95,6 +95,10 @@
public static final int RANDOMIZED_TIME_BETWEEN_VERSION_PROBES =
HANDSHAKE_TIMEOUT*2; // 20-30 secs
// If we don't receive any packets at all in this period, from any node,
tell the user
public static final long ALARM_TIME = 60*1000;
+ /** Maximum overall average ping time. If ping is greater than this,
+ * we reject all requests.
+ */
+ public static final long MAX_PING_TIME = 1000;
// 900ms
static final int MIN_INTERVAL_BETWEEN_INCOMING_SWAP_REQUESTS = 900;
@@ -133,6 +137,7 @@
final FNPPacketMangler packetMangler;
final PacketSender ps;
final NodeDispatcher dispatcher;
+ final NodePinger nodePinger;
final String filenamesPrefix;
final FilenameGenerator tempFilenameGenerator;
static short MAX_HTL = 10;
@@ -365,6 +370,7 @@
System.exit(EXIT_TEMP_INIT_ERROR);
throw new Error();
}
+ nodePinger = new NodePinger(this);
tempBucketFactory = new
PaddedEphemerallyEncryptedBucketFactory(new
TempBucketFactory(tempFilenameGenerator), random, 1024);
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
requestThrottle = new RequestThrottle(5000, 2.0F);
@@ -401,6 +407,7 @@
* Either it succeeds or it doesn't.
*/
ClientCHKBlock realGetCHK(ClientCHK key, boolean localOnly, boolean cache)
throws LowLevelGetException {
+ long startTime = System.currentTimeMillis();
Object o = makeRequestSender(key.getNodeCHK(), MAX_HTL,
random.nextLong(), null, lm.loc.getValue(), localOnly, cache);
if(o instanceof CHKBlock) {
try {
@@ -414,36 +421,65 @@
throw new
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND_IN_STORE);
}
RequestSender rs = (RequestSender)o;
- rs.waitUntilFinished();
- if(rs.getStatus() == RequestSender.SUCCESS) {
- try {
- return new ClientCHKBlock(rs.getPRB().getBlock(),
rs.getHeaders(), key, true);
- } catch (CHKVerifyException e) {
- Logger.error(this, "Does not verify: "+e, e);
- throw new
LowLevelGetException(LowLevelGetException.DECODE_FAILED);
- } catch (AbortedException e) {
- Logger.error(this, "Impossible: "+e, e);
- throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
- }
- } else {
- switch(rs.getStatus()) {
- case RequestSender.NOT_FINISHED:
- Logger.error(this, "RS still running in getCHK!: "+rs);
- throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
- case RequestSender.DATA_NOT_FOUND:
- throw new
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND);
- case RequestSender.REJECTED_OVERLOAD:
- throw new
LowLevelGetException(LowLevelGetException.REJECTED_OVERLOAD);
- case RequestSender.ROUTE_NOT_FOUND:
- throw new
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
- case RequestSender.TRANSFER_FAILED:
- throw new
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
- case RequestSender.VERIFY_FAILURE:
- throw new
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
- default:
- Logger.error(this, "Unknown RequestSender code in
getCHK: "+rs.getStatus()+" on "+rs);
- throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+ boolean rejectedOverload = false;
+ while(true) {
+ if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
+ requestThrottle.requestRejectedOverload();
+ rejectedOverload = true;
}
+
+ int status = rs.getStatus();
+
+ if(status == RequestSender.NOT_FINISHED)
+ continue;
+
+ if(status == RequestSender.TIMED_OUT ||
+ status ==
RequestSender.GENERATED_REJECTED_OVERLOAD) {
+ if(!rejectedOverload) {
+ requestThrottle.requestRejectedOverload();
+ rejectedOverload = true;
+ }
+ } else {
+ if(status == RequestSender.DATA_NOT_FOUND ||
+ status == RequestSender.SUCCESS ||
+ status == RequestSender.ROUTE_NOT_FOUND
||
+ status == RequestSender.VERIFY_FAILURE)
{
+ long rtt = System.currentTimeMillis() -
startTime;
+ insertThrottle.requestCompleted(rtt);
+ }
+ }
+
+ if(rs.getStatus() == RequestSender.SUCCESS) {
+ try {
+ return new
ClientCHKBlock(rs.getPRB().getBlock(), rs.getHeaders(), key, true);
+ } catch (CHKVerifyException e) {
+ Logger.error(this, "Does not verify: "+e, e);
+ throw new
LowLevelGetException(LowLevelGetException.DECODE_FAILED);
+ } catch (AbortedException e) {
+ Logger.error(this, "Impossible: "+e, e);
+ throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+ }
+ } else {
+ switch(rs.getStatus()) {
+ case RequestSender.NOT_FINISHED:
+ Logger.error(this, "RS still running in
getCHK!: "+rs);
+ throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+ case RequestSender.DATA_NOT_FOUND:
+ throw new
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND);
+ case RequestSender.ROUTE_NOT_FOUND:
+ throw new
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
+ case RequestSender.TRANSFER_FAILED:
+ throw new
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
+ case RequestSender.VERIFY_FAILURE:
+ throw new
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
+ case RequestSender.GENERATED_REJECTED_OVERLOAD:
+ case RequestSender.TIMED_OUT:
+ throw new
LowLevelGetException(LowLevelGetException.REJECTED_OVERLOAD);
+ default:
+ Logger.error(this, "Unknown RequestSender code
in getCHK: "+rs.getStatus()+" on "+rs);
+ throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+ }
+ }
}
}
@@ -459,6 +495,7 @@
long uid = random.nextLong();
if(!lockUID(uid))
Logger.error(this, "Could not lock UID just randomly generated:
"+uid+" - probably indicates broken PRNG");
+ long startTime = System.currentTimeMillis();
synchronized(this) {
if(cache) {
try {
@@ -470,35 +507,64 @@
is = makeInsertSender(block.getClientKey().getNodeCHK(),
MAX_HTL, uid, null, headers, prb, false,
lm.getLocation().getValue(), cache);
}
- is.waitUntilFinished();
- if(is.getStatus() == InsertSender.SUCCESS) {
- Logger.normal(this, "Succeeded inserting "+block);
- } else {
- int status = is.getStatus();
- String msg = "Failed inserting "+block+" : "+is.getStatusString();
- if(status == InsertSender.ROUTE_NOT_FOUND)
- msg += " - this is normal on small networks; the data will
still be propagated, but it can't find the 20+ nodes needed for full success";
- if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
- Logger.error(this, msg);
- else
- Logger.normal(this, msg);
- switch(is.getStatus()) {
- case InsertSender.NOT_FINISHED:
- Logger.error(this, "IS still running in putCHK!: "+is);
- throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
- case InsertSender.REJECTED_OVERLOAD:
- throw new
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
- case InsertSender.ROUTE_NOT_FOUND:
- throw new
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
- case InsertSender.INTERNAL_ERROR:
- throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
- default:
- Logger.error(this, "Unknown InsertSender code in
putCHK: "+is.getStatus()+" on "+is);
- throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
- }
+ boolean hasForwardedRejectedOverload = false;
+ while(true) {
+ synchronized(is) {
+ if(is.getStatus() == InsertSender.NOT_FINISHED) {
+ try {
+ is.wait(5*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ if((!hasForwardedRejectedOverload) &&
is.receivedRejectedOverload()) {
+ insertThrottle.requestRejectedOverload();
+ }
+ if(is.getStatus() == InsertSender.NOT_FINISHED)
continue;
+ }
+ // Finished?
+ if(!hasForwardedRejectedOverload) {
+ // Is it ours? Did we send a request?
+ if(is.sentRequest() && is.uid == uid && (is.getStatus()
== InsertSender.ROUTE_NOT_FOUND || is.getStatus() == InsertSender.SUCCESS)) {
+ // It worked!
+ long endTime = System.currentTimeMillis();
+ long len = endTime - startTime;
+ insertThrottle.requestCompleted(len);
+ }
+ }
+ if(is.getStatus() == InsertSender.SUCCESS) {
+ Logger.normal(this, "Succeeded inserting "+block);
+ } else {
+ int status = is.getStatus();
+ String msg = "Failed inserting "+block+" :
"+is.getStatusString();
+ if(status == InsertSender.ROUTE_NOT_FOUND)
+ msg += " - this is normal on small networks;
the data will still be propagated, but it can't find the 20+ nodes needed for
full success";
+ if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
+ Logger.error(this, msg);
+ else
+ Logger.normal(this, msg);
+ switch(is.getStatus()) {
+ case InsertSender.NOT_FINISHED:
+ Logger.error(this, "IS still running in
putCHK!: "+is);
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+ case InsertSender.GENERATED_REJECTED_OVERLOAD:
+ throw new
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
+ case InsertSender.ROUTE_NOT_FOUND:
+ throw new
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
+ case InsertSender.INTERNAL_ERROR:
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+ default:
+ Logger.error(this, "Unknown InsertSender code
in putCHK: "+is.getStatus()+" on "+is);
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+ }
+ }
}
}
+ public boolean shouldRejectRequest() {
+ return nodePinger.averagePingTime() > MAX_PING_TIME;
+ }
+
/**
* Export my reference so that another node can connect to me.
* @return
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -74,6 +74,19 @@
return handleDataRequest(m);
} else if(spec == DMT.FNPInsertRequest) {
return handleInsertRequest(m);
+ } else if(spec == DMT.FNPLinkPing) {
+ long id = m.getLong(DMT.PING_SEQNO);
+ Message msg = DMT.createFNPLinkPong(id);
+ try {
+ source.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ return true;
+ } else if(spec == DMT.FNPLinkPong) {
+ long id = m.getLong(DMT.PING_SEQNO);
+ source.receivedLinkPong(id);
+ return true;
}
return false;
}
Added: trunk/freenet/src/freenet/node/NodePinger.java
===================================================================
--- trunk/freenet/src/freenet/node/NodePinger.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/NodePinger.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -0,0 +1,63 @@
+package freenet.node;
+
+import freenet.support.Logger;
+
+public class NodePinger implements Runnable {
+
+ private double meanPing = 0;
+
+ NodePinger(Node n) {
+ this.node = n;
+ Thread t = new Thread(this, "Node pinger");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ final Node node;
+
+ public void run() {
+ while(true) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ PeerNode[] peers = node.peers.connectedPeers;
+ if(peers == null) continue;
+ recalculateMean(peers);
+ for(int i=0;i<peers.length;i++) {
+ PeerNode pn = peers[i];
+ if(!pn.isConnected())
+ continue;
+ pn.sendPing();
+ recalculateMean(peers);
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /** Recalculate the mean ping time */
+ private void recalculateMean(PeerNode[] peers) {
+ int peerCount = 0;
+ double total = 1.0;
+ for(int i=0;i<peers.length;i++) {
+ PeerNode peer = peers[i];
+ if(!peer.isConnected()) continue;
+ peerCount++;
+ total *= peer.averagePingTime();
+ }
+ if(peerCount > 0) {
+ total = Math.pow(total, 1.0 / peerCount);
+ meanPing = total;
+ Logger.minor(this, "Mean ping: "+meanPing+"ms");
+ }
+ }
+
+ public double averagePingTime() {
+ return meanPing;
+ }
+}
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -126,6 +126,7 @@
System.arraycopy(connectedPeers, 0, newConnectedPeers, 0,
connectedPeers.length);
newConnectedPeers[connectedPeers.length] = pn;
connectedPeers = newConnectedPeers;
+ Logger.minor(this, "Connected peers: "+connectedPeers.length);
}
// NodePeer route(double targetLocation, RoutingContext ctx) {
@@ -300,20 +301,11 @@
if(routedTo.contains(p)) continue;
if(p == pn) continue;
if(!p.isConnected()) continue;
+ if(p.isBackedOff()) continue;
count++;
any = p;
- if(!notIgnored.contains(p)) {
- //double pRO = p.getAdjustedPRejectedOverload();
- double pRO = p.getOtherBiasProbability();
- double random = node.random.nextDouble();
- if(random < pRO) {
- Logger.minor(this, "Ignoring "+p+": pRO="+pRO+",
random="+random);
- routedTo.add(p);
- continue;
- } else notIgnored.add(p);
- }
double diff = distance(p, loc);
- Logger.minor(this, "p.loc="+p.getLocation().getValue()+",
loc="+loc+", d="+distance(p.getLocation().getValue(), loc)+" usedD="+diff+",
bias="+p.getBias());
+ Logger.minor(this, "p.loc="+p.getLocation().getValue()+",
loc="+loc+", d="+distance(p.getLocation().getValue(), loc)+" usedD="+diff);
if((!ignoreSelf) && diff > maxDiff) continue;
if(diff < bestDiff) {
best = p;
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -27,6 +27,7 @@
import freenet.io.comm.PeerParseException;
import freenet.support.Fields;
import freenet.support.HexUtil;
+import freenet.support.LRUHashtable;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.math.BootstrappingDecayingRunningAverage;
@@ -271,6 +272,8 @@
pDataRequestRejectOverload = new SimpleRunningAverage(100, 0.05);
pInsertRejectOverload = new SimpleRunningAverage(100, 0.05);
pRejectOverload = new SimpleRunningAverage(100, 0.05);
+ pingNumber = node.random.nextLong();
+ pingAverage = new SimpleRunningAverage(20, 1);
}
private void randomizeMaxTimeBetweenPacketSends() {
@@ -861,9 +864,7 @@
public String getStatus() {
return
- (isConnected ? "CONNECTED " : "DISCONNECTED") + " " +
getPeer().toString()+" "+myName+" "+currentLocation.getValue()+" "+getVersion()
+
- " ob="+this.getOtherBiasProbability()+" ("+otherBiasValue+")
"+/*" adjpRO="+this.getAdjustedPRejectedOverload()+*//*" bias="+getBias()+*/"
reqs: pRO="+pDataRequestRejectOverload.currentValue()+"
(h="+pDataRequestRejectOverload.countReports()+") ins: pRO="+
pInsertRejectOverload.currentValue()+
- " (h="+pInsertRejectOverload.countReports()+")";
+ (isConnected ? "CONNECTED " : "DISCONNECTED") + " " +
getPeer().toString()+" "+myName+" "+currentLocation.getValue()+"
"+getVersion()+" backoff: "+backoffLength+" ("+(Math.max(backedOffUntil -
System.currentTimeMillis(),0))+")";
}
public String getVersion(){
@@ -951,105 +952,106 @@
return hashCode;
}
- int otherBiasValue = 0;
-
- /**
- * Record the fact that the node rejected a request due to
- * overload (or timed out etc).
- */
- public void rejectedOverload() {
- pRejectOverload.report(1.0);
- pDataRequestRejectOverload.report(1.0);
- increaseBias();
+ public void throttledSend(Message message, long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
+ node.globalThrottle.sendPacket(message, this, maxWaitTime);
}
- public void insertRejectedOverload() {
- pRejectOverload.report(1.0);
- pInsertRejectOverload.report(1.0);
- increaseBias();
- }
+ private final Object backoffSync = new Object();
- private void increaseBias() {
- synchronized(biasLock) {
- if(biasValue < 1.0) biasValue = 1.0;
- biasValue += BIAS_SENSITIVITY / BIAS_TARGET;
- otherBiasValue += 20;
+ public boolean isBackedOff() {
+ synchronized(backoffSync) {
+ if(System.currentTimeMillis() < backedOffUntil) {
+ Logger.minor(this, "Is backed off");
+ return true;
+ } else return false;
}
}
-
- private void decreaseBias() {
- synchronized(biasLock) {
- biasValue -= BIAS_SENSITIVITY;
- if(biasValue < 1.0) biasValue = 1.0;
- otherBiasValue -= 1;
+
+ long backedOffUntil = -1;
+ /** Initial nominal backoff length */
+ final int INITIAL_BACKOFF_LENGTH = 5000;
+ /** Double every time */
+ final int BACKOFF_MULTIPLIER = 2;
+ /** Maximum: 24 hours */
+ final int MAX_BACKOFF_LENGTH = 24*60*60*1000;
+ /** Current nominal backoff length */
+ int backoffLength = INITIAL_BACKOFF_LENGTH;
+
+ /**
+ * Got a local RejectedOverload.
+ * Back off this node for a while.
+ */
+ public void localRejectedOverload() {
+ synchronized(backoffSync) {
+ backoffLength = backoffLength * BACKOFF_MULTIPLIER;
+ if(backoffLength > MAX_BACKOFF_LENGTH)
+ backoffLength = MAX_BACKOFF_LENGTH;
+ backedOffUntil = System.currentTimeMillis() +
node.random.nextInt(backoffLength);
}
}
/**
- * Record the fact that the node did not reject a request
- * due to overload.
+ * Didn't get RejectedOverload.
+ * Reset backoff.
*/
- public void didNotRejectOverload() {
- pRejectOverload.report(0.0);
- pDataRequestRejectOverload.report(0.0);
- decreaseBias();
+ public void successNotOverload() {
+ synchronized(backoffSync) {
+ backoffLength = INITIAL_BACKOFF_LENGTH;
+ }
}
- public void insertDidNotRejectOverload() {
- pRejectOverload.report(0.0);
- pInsertRejectOverload.report(0.0);
- decreaseBias();
- }
+ Object pingSync = new Object();
+ final static int MAX_PINGS = 10;
+ final LRUHashtable pingsSentTimes = new LRUHashtable();
+ long pingNumber;
+ final RunningAverage pingAverage;
- public double getPRejectedOverload() {
- return pDataRequestRejectOverload.currentValue();
+ public void sendPing() {
+ long pingNo;
+ long now = System.currentTimeMillis();
+ Long lPingNo;
+ synchronized(pingSync) {
+ pingNo = pingNumber++;
+ lPingNo = new Long(pingNo);
+ Long lnow = new Long(now);
+ pingsSentTimes.push(lPingNo, lnow);
+ Logger.minor(this, "Pushed "+lPingNo+" "+lnow);
+ while(pingsSentTimes.size() > MAX_PINGS) {
+ Long l = (Long) pingsSentTimes.popValue();
+ Logger.minor(this,
"pingsSentTimes.size()="+pingsSentTimes.size()+", l="+l);
+ long tStarted = l.longValue();
+ pingAverage.report(now - tStarted);
+ Logger.minor(this, "Reporting dumped ping time
to "+this+" : "+(now - tStarted));
+ }
+ }
+ Message msg = DMT.createFNPLinkPing(pingNo);
+ try {
+ sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ synchronized(pingSync) {
+ pingsSentTimes.removeKey(lPingNo);
+ }
+ }
}
-
- public double getAdjustedPRejectedOverload() {
- double d = pRejectOverload.currentValue();
- long hits = pRejectOverload.countReports();
- hits = Math.min(hits, 100);
- double max = ((double) hits) / ((double) (hits + 1));
- if(hits < 25) return 0.0;
- return Math.min(d, max);
- }
-
- public double getPInsertRejectedOverload() {
- return pInsertRejectOverload.currentValue();
- }
- /**
- * Return the bias value for routing for this node.
- * The idea is simply that if a node is overloaded,
- * its specialization shrinks.
- * Essentially this is 1.0-P(RejectedOverload or timeout).
- */
- public double getBias() {
- synchronized(biasLock) {
- return biasValue;
+ public void receivedLinkPong(long id) {
+ Long lid = new Long(id);
+ long startTime;
+ synchronized(pingSync) {
+ Long s = (Long) pingsSentTimes.get(lid);
+ if(s == null) {
+ Logger.normal(this, "Dropping ping "+id+" on
"+this);
+ return;
+ }
+ startTime = s.longValue();
+ pingsSentTimes.removeKey(lid);
+ long now = System.currentTimeMillis();
+ pingAverage.report(now - startTime);
+ Logger.minor(this, "Reporting ping time to "+this+" :
"+(now - startTime));
}
-// double pSummaryFailure = pRejectOverload.currentValue();
-// long hits = pRejectOverload.countReports();
-// if(hits > 10) {
-// double max = ((double) hits) / ((double) (hits+1));
-// double denom = 1.0 - pSummaryFailure;
-// if(denom == 0.0) denom = 0.000001;
-// return denom;
-// } else {
-// return 1.0;
-// }
}
- public void throttledSend(Message message, long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
- node.globalThrottle.sendPacket(message, this, maxWaitTime);
+ public double averagePingTime() {
+ return pingAverage.currentValue();
}
-
- public double getOtherBiasProbability() {
- synchronized(biasLock) {
- double d = otherBiasValue / 100.0;
- if(d < 0) d = 0.0;
- d += 1.0;
- return 1.0 - (1.0 / d);
- }
- }
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -74,7 +74,11 @@
while(true) {
- rs.waitUntilStatusChange();
+ if(rs.waitUntilStatusChange()) {
+ // Forward RejectedOverload
+ Message msg = DMT.createFNPRejectedOverload(uid, false);
+ source.sendAsync(msg, null);
+ }
if(rs.transferStarted()) {
Message df = DMT.createFNPDataFound(uid, rs.getHeaders());
@@ -95,9 +99,11 @@
Message dnf = DMT.createFNPDataNotFound(uid);
source.sendAsync(dnf, null);
return;
- case RequestSender.REJECTED_OVERLOAD:
+ case RequestSender.GENERATED_REJECTED_OVERLOAD:
+ case RequestSender.TIMED_OUT:
+ // Locally generated.
// Propagate back to source who needs to reduce send rate
- Message reject = DMT.createFNPRejectedOverload(uid);
+ Message reject = DMT.createFNPRejectedOverload(uid, true);
source.sendAsync(reject, null);
return;
case RequestSender.ROUTE_NOT_FOUND:
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -40,7 +40,6 @@
final long uid;
final Node node;
private double nearestLoc;
- private final long startTime;
/** The source of this request if any - purely so we can avoid routing to
it */
final PeerNode source;
private PartiallyReceivedBlock prb = null;
@@ -54,10 +53,11 @@
static final int NOT_FINISHED = -1;
static final int SUCCESS = 0;
static final int ROUTE_NOT_FOUND = 1;
- static final int REJECTED_OVERLOAD = 2;
static final int DATA_NOT_FOUND = 3;
static final int TRANSFER_FAILED = 4;
static final int VERIFY_FAILURE = 5;
+ static final int TIMED_OUT = 6;
+ static final int GENERATED_REJECTED_OVERLOAD = 7;
@@ -67,7 +67,6 @@
public RequestSender(NodeCHK key, short htl, long uid, Node n, double
nearestLoc,
PeerNode source) {
- startTime = System.currentTimeMillis();
this.key = key;
this.htl = htl;
this.uid = uid;
@@ -123,140 +122,182 @@
Message req = DMT.createFNPDataRequest(uid, htl, key, nearestLoc);
- /**
- * What are we waiting for?
- * FNPAccepted - continue
- * FNPRejectedLoop - go to another node
- * FNPRejectedOverload - fail (propagates back to source,
- * then reduces source transmit rate)
- */
- MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
- MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
- MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
-
- // mfRejectedOverload must be the last thing in the or
- // So its or pointer remains null
- // Otherwise we need to recreate it below
- MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
-
next.send(req);
sentRequest = true;
- Message msg;
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from "+next+" while waiting
for Accepted on "+uid);
- continue;
- }
+ Message msg = null;
- if(msg == null) {
- // Timeout
- // Treat as FNPRejectOverloadd
- next.rejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
+ while(true) {
+
+ /**
+ * What are we waiting for?
+ * FNPAccepted - continue
+ * FNPRejectedLoop - go to another node
+ * FNPRejectedOverload - fail (propagates back to source,
+ * then reduces source transmit rate)
+ */
+
+ MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+
+ // mfRejectedOverload must be the last thing in the or
+ // So its or pointer remains null
+ // Otherwise we need to recreate it below
+ MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from "+next+" while
waiting for Accepted on "+uid);
+ break;
+ }
+
+ if(msg == null) {
+ Logger.minor(this, "Timeout waiting for Accepted");
+ // Timeout waiting for Accepted
+ next.localRejectedOverload();
+ forwardRejectedOverload();
+ // Try next node
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedLoop) {
+ Logger.minor(this, "Rejected loop");
+ next.successNotOverload();
+ // Find another node to route to
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ Logger.minor(this, "Rejected: overload");
+ // Non-fatal - probably still have time
left
+ forwardRejectedOverload();
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ Logger.minor(this, "Is local");
+ next.localRejectedOverload();
+ Logger.minor(this, "Local
RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ }
+ continue;
+ }
+
+ if(msg.getSpec() != DMT.FNPAccepted) {
+ Logger.error(this, "Unrecognized message: "+msg);
+ continue;
+ }
+
+ break;
}
- if(msg.getSpec() == DMT.FNPRejectedLoop) {
- next.didNotRejectOverload();
- // Find another node to route to
- continue;
+ if(msg == null || msg.getSpec() != DMT.FNPAccepted) {
+ // Try another node
+ continue;
}
+
+ Logger.minor(this, "Got Accepted");
- if(msg.getSpec() == DMT.FNPRejectedOverload) {
- // Failed. Propagate back to source.
- // Source will reduce send rate.
- next.rejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
- }
-
// Otherwise, must be Accepted
// So wait...
- MessageFilter mfDNF =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataNotFound);
- MessageFilter mfDF =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataFound);
- MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
- mfRejectedOverload = mfRejectedOverload.setTimeout(FETCH_TIMEOUT);
- mf = mfDNF.or(mfDF.or(mfRouteNotFound.or(mfRejectedOverload)));
+ while(true) {
+
+ MessageFilter mfDNF =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataNotFound);
+ MessageFilter mfDF =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataFound);
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRejectedOverload);
+ MessageFilter mf =
mfDNF.or(mfDF.or(mfRouteNotFound.or(mfRejectedOverload)));
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from "+next+" while waiting
for data on "+uid);
- continue;
- }
-
- if(msg == null) {
- // Timeout. Treat as FNPRejectOverload.
- finish(REJECTED_OVERLOAD, next);
- return;
- }
-
- if(msg.getSpec() == DMT.FNPDataNotFound) {
- next.didNotRejectOverload();
- finish(DATA_NOT_FOUND, next);
- return;
- }
-
- if(msg.getSpec() == DMT.FNPRouteNotFound) {
- // Backtrack within available hops
- short newHtl = msg.getShort(DMT.HTL);
- if(newHtl < htl) htl = newHtl;
- next.didNotRejectOverload();
- continue;
- }
-
- if(msg.getSpec() == DMT.FNPRejectedOverload) {
- next.rejectedOverload();
- finish(REJECTED_OVERLOAD, next);
- return;
- }
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from "+next+" while
waiting for data on "+uid);
+ continue;
+ }
+
+ if(msg == null) {
+ // Fatal timeout
+ next.localRejectedOverload();
+ forwardRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPDataNotFound) {
+ next.successNotOverload();
+ finish(DATA_NOT_FOUND, next);
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPRouteNotFound) {
+ // Backtrack within available hops
+ short newHtl = msg.getShort(DMT.HTL);
+ if(newHtl < htl) htl = newHtl;
+ next.successNotOverload();
+ continue;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal - probably still have time
left
+ forwardRejectedOverload();
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this, "Local
RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ }
+ continue; // Wait for any further
response
+ }
- // Found data
- next.didNotRejectOverload();
-
- // First get headers
-
- headers =
((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
-
- // FIXME: Validate headers
-
- node.addTransferringSender(key, this);
- try {
-
- prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
-
- synchronized(this) {
- notifyAll();
- }
-
- BlockReceiver br = new BlockReceiver(node.usm, next, uid, prb);
-
- try {
- byte[] data = br.receive();
- // Received data
- CHKBlock block;
- try {
- block = new CHKBlock(data, headers, key);
- } catch (CHKVerifyException e1) {
- Logger.normal(this, "Got data but verify failed: "+e1,
e1);
- finish(VERIFY_FAILURE, next);
- return;
- }
- node.store(block);
- finish(SUCCESS, next);
- return;
- } catch (RetrievalException e) {
- Logger.normal(this, "Transfer failed: "+e, e);
- finish(TRANSFER_FAILED, next);
- return;
- }
- } finally {
- node.removeTransferringSender(key, this);
+ if(msg.getSpec() != DMT.FNPDataFound) {
+ Logger.error(this, "Unexpected message: "+msg);
+ }
+
+ // Found data
+ next.successNotOverload();
+
+ // First get headers
+
+ headers =
((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
+
+ // FIXME: Validate headers
+
+ node.addTransferringSender(key, this);
+ try {
+
+ prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
+
+ synchronized(this) {
+ notifyAll();
+ }
+
+ BlockReceiver br = new BlockReceiver(node.usm, next,
uid, prb);
+
+ try {
+ byte[] data = br.receive();
+ // Received data
+ CHKBlock block;
+ try {
+ block = new CHKBlock(data, headers,
key);
+ } catch (CHKVerifyException e1) {
+ Logger.normal(this, "Got data but
verify failed: "+e1, e1);
+ finish(VERIFY_FAILURE, next);
+ return;
+ }
+ node.store(block);
+ finish(SUCCESS, next);
+ return;
+ } catch (RetrievalException e) {
+ Logger.normal(this, "Transfer failed: "+e, e);
+ finish(TRANSFER_FAILED, next);
+ return;
+ }
+ } finally {
+ node.removeTransferringSender(key, this);
+ }
}
}
} catch (Throwable t) {
@@ -267,6 +308,15 @@
}
}
+ private volatile boolean hasForwardedRejectedOverload;
+
+ /** Forward RejectedOverload to the request originator */
+ private synchronized void forwardRejectedOverload() {
+ if(hasForwardedRejectedOverload) return;
+ hasForwardedRejectedOverload = true;
+ notifyAll();
+ }
+
public PartiallyReceivedBlock getPRB() {
return prb;
}
@@ -275,14 +325,21 @@
return prb != null;
}
+ boolean hadROLastTimeWaited = false;
+
/**
* Wait until either the transfer has started or we have a
* terminal status code.
+ * @return True if we got a RejectedOverload.
*/
- public synchronized void waitUntilStatusChange() {
+ public synchronized boolean waitUntilStatusChange() {
while(true) {
- if(prb != null) return;
- if(status != NOT_FINISHED) return;
+ if((!hadROLastTimeWaited) && hasForwardedRejectedOverload) {
+ hadROLastTimeWaited = true;
+ return true;
+ }
+ if(prb != null) return false;
+ if(status != NOT_FINISHED) return false;
try {
wait(10000);
} catch (InterruptedException e) {
@@ -311,14 +368,6 @@
throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
status = code;
- if(sentRequest) {
- if(status == REJECTED_OVERLOAD) {
- node.getRequestThrottle().requestRejectedOverload();
- } else if(status == SUCCESS || status == ROUTE_NOT_FOUND ||
status == DATA_NOT_FOUND || status == VERIFY_FAILURE) {
-
node.getRequestThrottle().requestCompleted(System.currentTimeMillis() -
startTime);
- }
- }
-
synchronized(this) {
notifyAll();
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-29 00:36:32 UTC (rev
7630)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-29 18:18:12 UTC (rev
7631)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 244;
+ public static final int buildNumber = 245;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 244;
+ public static final int lastGoodBuild = 245;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Modified: trunk/freenet/src/freenet/support/LRUHashtable.java
===================================================================
--- trunk/freenet/src/freenet/support/LRUHashtable.java 2005-11-29 00:36:32 UTC
(rev 7630)
+++ trunk/freenet/src/freenet/support/LRUHashtable.java 2005-11-29 18:18:12 UTC
(rev 7631)
@@ -30,6 +30,7 @@
insert.value = value;
list.remove(insert);
}
+ Logger.minor(this, "Pushed "+insert);
list.unshift(insert);
}
@@ -109,8 +110,13 @@
public Object obj;
public Object value;
- public QItem(Object obj, Object key) {
- this.obj = obj;
+ public QItem(Object key, Object val) {
+ this.obj = key;
+ this.value = val;
}
+
+ public String toString() {
+ return super.toString()+": "+obj+" "+value;
+ }
}
}