Author: toad
Date: 2005-11-17 20:59:49 +0000 (Thu, 17 Nov 2005)
New Revision: 7547
Added:
trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
trunk/freenet/src/freenet/node/QueuedDataRequest.java
trunk/freenet/src/freenet/node/QueuedInsertRequest.java
trunk/freenet/src/freenet/node/QueuedRequest.java
trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/RequestStarterClient.java
trunk/freenet/src/freenet/node/RequestThrottle.java
Modified:
trunk/freenet/src/freenet/client/Fetcher.java
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/FileInserter.java
trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/InserterContext.java
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/io/comm/RetrievalException.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/InsertSender.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
trunk/freenet/src/freenet/node/TextModeClientInterface.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
Log:
184 (mandatory):
Implement client-node-level load limiting.
Re-enable FEC.
Modified: trunk/freenet/src/freenet/client/Fetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/Fetcher.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/client/Fetcher.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -109,7 +109,7 @@
// Do the fetch
KeyBlock block;
try {
- block = ctx.client.getKey(key, ctx.localRequestOnly);
+ block = ctx.client.getKey(key, ctx.localRequestOnly,
ctx.starterClient);
} catch (LowLevelGetException e) {
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@
import freenet.client.events.ClientEventProducer;
import freenet.crypt.RandomSource;
+import freenet.node.RequestStarterClient;
import freenet.node.SimpleLowLevelClient;
import freenet.support.BucketFactory;
@@ -34,6 +35,7 @@
final int maxMetadataSize;
final int maxDataBlocksPerSegment;
final int maxCheckBlocksPerSegment;
+ final RequestStarterClient starterClient;
public FetcherContext(SimpleLowLevelClient client, long curMaxLength,
@@ -43,7 +45,7 @@
boolean allowSplitfiles, boolean followRedirects,
boolean localRequestOnly,
int maxDataBlocksPerSegment, int
maxCheckBlocksPerSegment,
RandomSource random, ArchiveManager archiveManager,
BucketFactory bucketFactory,
- ClientEventProducer producer) {
+ ClientEventProducer producer, RequestStarterClient
starter) {
this.client = client;
this.maxOutputLength = curMaxLength;
this.maxTempLength = curMaxTempLength;
@@ -64,6 +66,7 @@
this.eventProducer = producer;
this.maxDataBlocksPerSegment = maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment = maxCheckBlocksPerSegment;
+ this.starterClient = starter;
}
public FetcherContext(FetcherContext ctx, int maskID) {
@@ -88,6 +91,7 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment = 0;
this.maxCheckBlocksPerSegment = 0;
+ this.starterClient = ctx.starterClient;
} else if(maskID == SPLITFILE_DEFAULT_MASK) {
this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
@@ -109,6 +113,7 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
+ this.starterClient = ctx.starterClient;
} else if(maskID == SPLITFILE_USE_LENGTHS_MASK) {
this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
@@ -130,6 +135,7 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
+ this.starterClient = ctx.starterClient;
} else throw new IllegalArgumentException();
}
Modified: trunk/freenet/src/freenet/client/FileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/FileInserter.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/client/FileInserter.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -122,7 +122,7 @@
try {
ctx.eventProducer.produceEvent(new
SimpleBlockPutEvent(chk.getClientKey()));
if(!getCHKOnly)
- ctx.client.putCHK(chk);
+ ctx.client.putCHK(chk, ctx.starterClient);
} catch (LowLevelPutException e) {
le = e;
}
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@
import freenet.client.events.ClientEventListener;
import freenet.keys.FreenetURI;
+import freenet.node.RequestStarterClient;
public interface HighLevelSimpleClient {
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-17 20:59:49 UTC (rev 7547)
@@ -6,6 +6,7 @@
import freenet.client.events.SimpleEventProducer;
import freenet.crypt.RandomSource;
import freenet.keys.FreenetURI;
+import freenet.node.RequestStarterClient;
import freenet.node.SimpleLowLevelClient;
import freenet.support.BucketFactory;
import freenet.support.Logger;
@@ -21,6 +22,8 @@
private long curMaxTempLength;
private int curMaxMetadataLength;
private final RandomSource random;
+ private final RequestStarterClient requestStarter;
+ private final RequestStarterClient insertStarter;
static final int MAX_RECURSION = 10;
static final int MAX_ARCHIVE_RESTARTS = 2;
static final boolean DONT_ENTER_IMPLICIT_ARCHIVES = true;
@@ -50,7 +53,7 @@
static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 192;
- public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
+ public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r, RequestStarterClient
requestStarterClient, RequestStarterClient insertStarterClient) {
this.client = client;
archiveManager = mgr;
bucketFactory = bf;
@@ -60,6 +63,8 @@
curMaxLength = Long.MAX_VALUE;
curMaxTempLength = Long.MAX_VALUE;
curMaxMetadataLength = 1024 * 1024;
+ this.requestStarter = requestStarterClient;
+ this.insertStarter = insertStarterClient;
}
public void setMaxLength(long maxLength) {
@@ -80,14 +85,14 @@
SPLITFILE_THREADS, SPLITFILE_BLOCK_RETRIES,
NON_SPLITFILE_RETRIES,
FETCH_SPLITFILES, FOLLOW_REDIRECTS,
LOCAL_REQUESTS_ONLY,
MAX_SPLITFILE_BLOCKS_PER_SEGMENT,
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
- random, archiveManager, bucketFactory,
globalEventProducer);
+ random, archiveManager, bucketFactory,
globalEventProducer, requestStarter);
Fetcher f = new Fetcher(uri, context);
return f.run();
}
public FreenetURI insert(InsertBlock insert, boolean getCHKOnly) throws
InserterException {
InserterContext context = new InserterContext(client,
bucketFactory, random, SPLITFILE_INSERT_RETRIES,
- SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer);
+ SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, insertStarter);
FileInserter i = new FileInserter(context);
return i.run(insert, false, getCHKOnly);
}
Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/InserterContext.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@
import freenet.client.events.ClientEventProducer;
import freenet.crypt.RandomSource;
+import freenet.node.RequestStarterClient;
import freenet.node.SimpleLowLevelClient;
import freenet.support.BucketFactory;
@@ -19,21 +20,22 @@
final int splitfileSegmentDataBlocks;
final int splitfileSegmentCheckBlocks;
final ClientEventProducer eventProducer;
+ final RequestStarterClient starterClient;
public InserterContext(SimpleLowLevelClient client, BucketFactory bf,
RandomSource random,
int maxRetries, int maxThreads, int
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
- ClientEventProducer eventProducer) {
+ ClientEventProducer eventProducer, RequestStarterClient
sctx) {
this.client = client;
this.bf = bf;
this.random = random;
dontCompress = false;
- //splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
- splitfileAlgorithm = Metadata.SPLITFILE_NONREDUNDANT;
+ splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
this.maxInsertBlockRetries = maxRetries;
this.maxSplitInsertThreads = maxThreads;
this.eventProducer = eventProducer;
this.splitfileSegmentDataBlocks = splitfileSegmentDataBlocks;
this.splitfileSegmentCheckBlocks = splitfileSegmentCheckBlocks;
+ this.starterClient = sctx;
}
}
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2005-11-16 19:29:35 UTC (rev
7546)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2005-11-17 20:59:49 UTC (rev
7547)
@@ -354,13 +354,15 @@
public static final MessageType sendAborted = new
MessageType("sendAborted") {{
addField(UID, Long.class);
- addField(REASON, String.class);
+ addField(DESCRIPTION, String.class);
+ addField(REASON, Integer.class);
}};
- public static final Message createSendAborted(long uid, String reason) {
+ public static final Message createSendAborted(long uid, int reason,
String description) {
Message msg = new Message(sendAborted);
msg.set(UID, uid);
msg.set(REASON, reason);
+ msg.set(DESCRIPTION, description);
return msg;
}
Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -38,6 +38,7 @@
public static final int TIMED_OUT = 4;
public static final int ALREADY_CACHED = 6;
public static final int SENDER_DISCONNECTED = 7;
+ public static final int NO_DATAINSERT = 8;
int _reason;
String _cause = null;
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -57,6 +57,10 @@
_usm = usm;
}
+ public void sendAborted(int reason, String desc) throws
NotConnectedException {
+ _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc));
+ }
+
public byte[] receive() throws RetrievalException {
int consecutiveMissingPacketReports = 0;
try {
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -65,6 +65,10 @@
}
}
+ public void sendAborted(int reason, String desc) throws
NotConnectedException {
+ _usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc));
+ }
+
public boolean send() {
final PacketThrottle throttle =
PacketThrottle.getThrottle(_destination.getPeer(), _prb._packetSize);
_receiverThread = Thread.currentThread();
@@ -133,7 +137,7 @@
public void receiveAborted(int reason, String
description) {
try {
-
((PeerNode)_destination).sendAsync(DMT.createSendAborted(reason, description),
null);
+
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason,
description), null);
} catch (NotConnectedException e) {
Logger.minor(this, "Receive aborted and receiver is not
connected");
}
Added: trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/ChainedRequestThrottle.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/ChainedRequestThrottle.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -0,0 +1,22 @@
+package freenet.node;
+
+/**
+ * RequestThrottle which takes a second throttle, and never
+ * returns a delay less than that throttle's current delay.
+ */
+public class ChainedRequestThrottle extends RequestThrottle {
+
+ private final RequestThrottle otherThrottle;
+
+ public ChainedRequestThrottle(int rtt, float winsz, RequestThrottle
other) {
+ super(rtt, winsz);
+ otherThrottle = other;
+ }
+
+ public long getDelay() {
+ long delay = super.getDelay();
+ long otherDelay = otherThrottle.getDelay();
+ return Math.max(delay, otherDelay);
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -90,6 +90,10 @@
Logger.error(this, "Did not receive DataInsert on "+uid+" from
"+source+" !");
Message tooSlow = DMT.createFNPRejectedTimeout(uid);
source.sendAsync(tooSlow, null);
+ prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
+ br = new BlockReceiver(node.usm, source, uid, prb);
+ prb.abort(RetrievalException.NO_DATAINSERT, "No DataInsert");
+ br.sendAborted(RetrievalException.NO_DATAINSERT, "No DataInsert");
return;
}
Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/InsertSender.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -15,7 +15,14 @@
public final class InsertSender implements Runnable {
- InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
+ public class Sender implements Runnable {
+
+ public void run() {
+ bt.send();
+ }
+ }
+
+ InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
PeerNode source, Node node, PartiallyReceivedBlock prb, boolean
fromStore, double closestLocation) {
this.myKey = myKey;
this.target = myKey.toNormalizedDouble();
@@ -27,6 +34,7 @@
this.prb = prb;
this.fromStore = fromStore;
this.closestLocation = closestLocation;
+ this.startTime = System.currentTimeMillis();
Thread t = new Thread(this, "InsertSender for UID "+uid+" on
"+node.portNumber+" at "+System.currentTimeMillis());
t.setDaemon(true);
t.start();
@@ -48,6 +56,10 @@
final boolean fromStore;
private boolean receiveFailed = false;
final double closestLocation;
+ final long startTime;
+ private BlockTransmitter bt;
+ private Sender s;
+ private Thread senderThread;
private int status = -1;
static final int NOT_FINISHED = -1;
@@ -110,8 +122,8 @@
// mfRejectedOverload must be the last thing in the or
// So its or pointer remains null
// Otherwise we need to recreate it below
+ mfRejectedOverload.clearOr();
MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
- mfRejectedOverload.clearOr();
// Send to next node
@@ -127,9 +139,11 @@
}
if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
- if(msg == null) {
- // No response, move on
- continue;
+ if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Overload... hmmmm - propagate error back to source
+ Logger.error(this, "Propagating "+msg+" back to source on
"+this);
+ finish(REJECTED_OVERLOAD);
+ return;
}
if(msg.getSpec() == DMT.FNPRejectedLoop) {
@@ -137,19 +151,11 @@
continue;
}
- if(msg.getSpec() == DMT.FNPRejectedOverload) {
- // Overload... hmmmm - propagate error back to source
- Logger.error(this, "Propagating "+msg+" back to source on
"+this);
- finish(REJECTED_OVERLOAD);
- return;
- }
-
// Otherwise must be an Accepted
// Send them the data.
// Which might be the new data resulting from a collision...
- BlockTransmitter bt;
Message dataInsert;
PartiallyReceivedBlock prbNow;
prbNow = prb;
@@ -168,10 +174,12 @@
MessageFilter mfRNF =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRouteNotFound);
MessageFilter mfInsertReply =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPInsertReply);
mfRejectedOverload.setTimeout(PUT_TIMEOUT);
+ mfRejectedOverload.clearOr();
MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRouteNotFound);
MessageFilter mfDataInsertRejected =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPDataInsertRejected);
+ MessageFilter mfTimeout =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRejectedTimeout);
- mf =
mfRNF.or(mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfRejectedOverload.setTimeout(PUT_TIMEOUT)))));
+ mf =
mfRNF.or(mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload)))));
Logger.minor(this, "Sending DataInsert");
if(receiveFailed) return;
@@ -179,8 +187,10 @@
Logger.minor(this, "Sending data");
if(receiveFailed) return;
- bt.send();
- Logger.minor(this, "Sent data");
+ s = new Sender();
+ senderThread = new Thread(s);
+ senderThread.setDaemon(true);
+ senderThread.start();
if(receiveFailed) return;
try {
@@ -200,7 +210,7 @@
return;
}
- if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ if(msg.getSpec() == DMT.FNPRejectedOverload || msg.getSpec() ==
DMT.FNPRejectedTimeout) {
Logger.minor(this, "Rejected due to overload");
finish(REJECTED_OVERLOAD);
return;
@@ -255,14 +265,19 @@
}
- // Otherwise should be an InsertReply
+ if(msg.getSpec() != DMT.FNPInsertReply) {
+ Logger.error(this, "Unknown reply: "+msg);
+ finish(INTERNAL_ERROR);
+ }
+
// Our task is complete
finish(SUCCESS);
return;
}
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
- finish(INTERNAL_ERROR);
+ if(status == NOT_FINISHED)
+ finish(INTERNAL_ERROR);
} finally {
node.completed(uid);
node.removeInsertSender(myKey, origHTL, this);
@@ -280,12 +295,30 @@
} catch (InterruptedException e) {
// Ignore
}
- }
+ }
}
private void finish(int code) {
Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
+ if(status != NOT_FINISHED)
+ throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
status = code;
+
+ if(senderThread != null) {
+ while(senderThread.isAlive()) {
+ try {
+ senderThread.join();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+ if(status == REJECTED_OVERLOAD)
+ node.getInsertThrottle().requestRejectedOverload();
+ else if(status == SUCCESS || status == ROUTE_NOT_FOUND)
+
node.getInsertThrottle().requestCompleted(System.currentTimeMillis() -
startTime);
+
synchronized(this) {
notifyAll();
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-11-16 19:29:35 UTC (rev
7546)
+++ trunk/freenet/src/freenet/node/Node.java 2005-11-17 20:59:49 UTC (rev
7547)
@@ -27,6 +27,7 @@
import freenet.client.ArchiveManager;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
+import freenet.client.InsertBlock;
import freenet.crypt.DiffieHellman;
import freenet.crypt.RandomSource;
import freenet.crypt.Yarrow;
@@ -61,7 +62,7 @@
/**
* @author amphibian
*/
-public class Node implements SimpleLowLevelClient {
+public class Node implements QueueingSimpleLowLevelClient {
static final long serialVersionUID = -1;
@@ -134,6 +135,10 @@
// Client stuff
final ArchiveManager archiveManager;
final BucketFactory tempBucketFactory;
+ final RequestThrottle requestThrottle;
+ final RequestStarter requestStarter;
+ final RequestThrottle insertThrottle;
+ final RequestStarter insertStarter;
// Client stuff that needs to be configged - FIXME
static final int MAX_ARCHIVE_HANDLERS = 200; // don't take up much RAM...
FIXME
@@ -340,6 +345,12 @@
}
tempBucketFactory = new
PaddedEphemerallyEncryptedBucketFactory(new
TempBucketFactory(tempFilenameGenerator), random, 1024);
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
+ requestThrottle = new RequestThrottle(5000, 2.0F);
+ requestStarter = new RequestStarter(requestThrottle, "Request
starter ("+portNumber+")");
+ //insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
+ // FIXME reenable the above
+ insertThrottle = new RequestThrottle(10000, 2.0F);
+ insertStarter = new RequestStarter(insertThrottle, "Insert
starter ("+portNumber+")");
}
void start(SwapRequestInterval interval) {
@@ -349,9 +360,16 @@
usm.start();
}
- public KeyBlock getKey(ClientKey key, boolean localOnly) throws
LowLevelGetException {
+ public KeyBlock getKey(ClientKey key, boolean localOnly,
RequestStarterClient client) throws LowLevelGetException {
+ if(localOnly)
+ return realGetKey(key, localOnly);
+ else
+ return client.getKey(key, localOnly);
+ }
+
+ public KeyBlock realGetKey(ClientKey key, boolean localOnly) throws
LowLevelGetException {
if(key instanceof ClientCHK)
- return getCHK((ClientCHK)key, localOnly);
+ return realGetCHK((ClientCHK)key, localOnly);
else
throw new IllegalArgumentException("Not a CHK: "+key);
}
@@ -360,7 +378,7 @@
* Really trivially simple client interface.
* Either it succeeds or it doesn't.
*/
- public ClientCHKBlock getCHK(ClientCHK key, boolean localOnly) throws
LowLevelGetException {
+ ClientCHKBlock realGetCHK(ClientCHK key, boolean localOnly) throws
LowLevelGetException {
Object o = makeRequestSender(key.getNodeCHK(), MAX_HTL,
random.nextLong(), null, lm.loc.getValue(), localOnly);
if(o instanceof CHKBlock) {
try {
@@ -407,7 +425,11 @@
}
}
- public void putCHK(ClientCHKBlock block) throws LowLevelPutException {
+ public void putCHK(ClientCHKBlock block, RequestStarterClient starter)
throws LowLevelPutException {
+ starter.putCHK(block);
+ }
+
+ public void realPutCHK(ClientCHKBlock block) throws LowLevelPutException {
byte[] data = block.getData();
byte[] headers = block.getHeader();
PartiallyReceivedBlock prb = new
PartiallyReceivedBlock(PACKETS_IN_BLOCK, PACKET_SIZE, data);
@@ -786,8 +808,8 @@
writeNodeFile();
}
- public HighLevelSimpleClient makeClient() {
- return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random);
+ public HighLevelSimpleClient makeClient(short prioClass, short prio) {
+ return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, makeStarterClient(prioClass, prio, false),
makeStarterClient(prioClass, prio, true));
}
private static class MemoryChecker implements Runnable {
@@ -804,4 +826,16 @@
}
}
}
+
+ public RequestThrottle getRequestThrottle() {
+ return requestThrottle;
+ }
+
+ public RequestThrottle getInsertThrottle() {
+ return insertThrottle;
+ }
+
+ public RequestStarterClient makeStarterClient(short prioClass, short
prio, boolean inserts) {
+ return new RequestStarterClient(prioClass, prio, random, this,
inserts ? insertStarter : requestStarter);
+ }
}
Added: trunk/freenet/src/freenet/node/QueuedDataRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedDataRequest.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedDataRequest.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -0,0 +1,23 @@
+package freenet.node;
+
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+
+public class QueuedDataRequest extends QueuedRequest {
+
+ private final ClientKey key;
+ private final boolean localOnly;
+ private QueueingSimpleLowLevelClient client;
+
+ public QueuedDataRequest(ClientKey key, boolean localOnly,
QueueingSimpleLowLevelClient client) {
+ this.key = key;
+ this.localOnly = localOnly;
+ this.client = client;
+ }
+
+ public KeyBlock waitAndFetch() throws LowLevelGetException {
+ waitForSendClearance();
+ return client.realGetKey(key, localOnly);
+ }
+
+}
Added: trunk/freenet/src/freenet/node/QueuedInsertRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedInsertRequest.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedInsertRequest.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -0,0 +1,19 @@
+package freenet.node;
+
+import freenet.keys.ClientCHKBlock;
+
+public class QueuedInsertRequest extends QueuedRequest {
+
+ private final ClientCHKBlock block;
+ private QueueingSimpleLowLevelClient client;
+
+ public QueuedInsertRequest(ClientCHKBlock block,
QueueingSimpleLowLevelClient client) {
+ this.block = block;
+ this.client = client;
+ }
+
+ public void waitAndPut() throws LowLevelPutException {
+ waitForSendClearance();
+ client.realPutCHK(block);
+ }
+}
Added: trunk/freenet/src/freenet/node/QueuedRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedRequest.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedRequest.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -0,0 +1,32 @@
+package freenet.node;
+
+/**
+ * A request (including both DataRequest's and InsertRequest's) which can be
queued
+ * by a RequestStarter.
+ */
+public abstract class QueuedRequest {
+
+ private boolean clearToSend = false;
+
+ /**
+ * Shell for sending the request.
+ */
+ public final void clearToSend() {
+ synchronized(this) {
+ clearToSend = true;
+ notifyAll();
+ }
+ }
+
+ protected void waitForSendClearance() {
+ synchronized(this) {
+ while(!clearToSend) {
+ try {
+ wait(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+}
Added: trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
2005-11-17 20:59:49 UTC (rev 7547)
@@ -0,0 +1,16 @@
+package freenet.node;
+
+import freenet.client.InsertBlock;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+
+interface QueueingSimpleLowLevelClient extends SimpleLowLevelClient {
+
+ /** Unqueued version. Only call from QueuedDataRequest ! */
+ KeyBlock realGetKey(ClientKey key, boolean localOnly) throws
LowLevelGetException;
+
+ /** Ditto */
+ void realPutCHK(ClientCHKBlock block) throws LowLevelPutException;
+
+}
Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2005-11-17 20:59:49 UTC (rev 7547)
@@ -9,6 +9,7 @@
import freenet.keys.CHKEncodeException;
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
import freenet.node.PeerNode;
import freenet.support.Fields;
import freenet.support.HexUtil;
@@ -70,6 +71,10 @@
a.peers.connect(b.exportFieldSet());
b.peers.connect(a.exportFieldSet());
}
+
+ RequestStarterClient[] starters = new
RequestStarterClient[NUMBER_OF_NODES];
+ for(int i=0;i<starters.length;i++)
+ starters[i] =
nodes[i].makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0,
false); // pretend are all requests
Logger.normal(RealNodeRoutingTest.class, "Added random links");
@@ -164,7 +169,7 @@
String dataString = baseString + requestNumber;
// Pick random node to insert to
int node1 = random.nextInt(NUMBER_OF_NODES);
- Node randomNode = nodes[random.nextInt(NUMBER_OF_NODES)];
+ Node randomNode = nodes[node1];
Logger.error(RealNodeRequestInsertTest.class,"Inserting:
\""+dataString+"\" to "+node1);
byte[] data = dataString.getBytes();
ClientCHKBlock block;
@@ -176,7 +181,7 @@
Logger.error(RealNodeRequestInsertTest.class, "Decoded: "+new
String(newBlock.memoryDecode(chk)));
Logger.error(RealNodeRequestInsertTest.class,"CHK:
"+chk.getURI());
Logger.error(RealNodeRequestInsertTest.class,"Headers:
"+HexUtil.bytesToHex(block.getHeader()));
- randomNode.putCHK(block);
+ randomNode.putCHK(block, starters[node1]);
Logger.error(RealNodeRequestInsertTest.class, "Inserted to
"+node1);
Logger.error(RealNodeRequestInsertTest.class, "Data:
"+Fields.hashCode(encData)+", Headers: "+Fields.hashCode(encHeaders));
// Pick random node to request from
@@ -185,7 +190,7 @@
node2 = random.nextInt(NUMBER_OF_NODES);
} while(node2 == node1);
Node fetchNode = nodes[node2];
- block = fetchNode.getCHK(chk, false);
+ block = (ClientCHKBlock) fetchNode.getKey((ClientKey) chk,
false, starters[node2]);
if(block == null) {
Logger.error(RealNodeRequestInsertTest.class, "Fetch
FAILED from "+node2);
requestsAvg.report(0.0);
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -40,6 +40,7 @@
final long uid;
final Node node;
private double nearestLoc;
+ private final long startTime;
/** The source of this request if any - purely so we can avoid routing to
it */
final PeerNode source;
private PartiallyReceivedBlock prb = null;
@@ -65,6 +66,7 @@
public RequestSender(NodeCHK key, short htl, long uid, Node n, double
nearestLoc,
PeerNode source) {
+ startTime = System.currentTimeMillis();
this.key = key;
this.htl = htl;
this.uid = uid;
@@ -295,8 +297,15 @@
private void finish(int code) {
Logger.minor(this, "finish("+code+")");
+ if(status != NOT_FINISHED)
+ throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
status = code;
-
+
+ if(status == REJECTED_OVERLOAD)
+ node.getRequestThrottle().requestRejectedOverload();
+ else if(status == SUCCESS || status == ROUTE_NOT_FOUND || status ==
DATA_NOT_FOUND || status == VERIFY_FAILURE)
+
node.getRequestThrottle().requestCompleted(System.currentTimeMillis() -
startTime);
+
synchronized(this) {
notifyAll();
}
Added: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -0,0 +1,187 @@
+package freenet.node;
+
+import java.util.LinkedList;
+import java.util.Vector;
+
+import freenet.support.Logger;
+import freenet.support.UpdatableSortedLinkedList;
+import freenet.support.UpdatableSortedLinkedListKilledException;
+
+/**
+ * Starts requests.
+ * Nobody starts a request directly, you have to go through RequestStarter.
+ * And you have to provide a RequestStarterClient. We do round robin between
+ * clients on the same priority level.
+ */
+public class RequestStarter implements Runnable {
+
+ /*
+ * Priority classes
+ */
+ /** Anything more important than fproxy */
+ public static final short MAXIMUM_PRIORITY_CLASS = 0;
+ /** Fproxy etc */
+ public static final short INTERACTIVE_PRIORITY_CLASS = 1;
+ /** Fproxy splitfile fetches */
+ public static final short IMMEDIATE_SPLITFILE_PRIORITY_CLASS = 2;
+ /** USK updates etc */
+ public static final short UPDATE_PRIORITY_CLASS = 3;
+ /** Bulk splitfile fetches */
+ public static final short BULK_SPLITFILE_PRIORITY_CLASS = 4;
+ /** Prefetch */
+ public static final short PREFETCH_PRIORITY_CLASS = 5;
+ /** Anything less important than prefetch (redundant??) */
+ public static final short MINIMUM_PRIORITY_CLASS = 6;
+
+ // Clients registered
+ final Vector clientsByPriority;
+ final RequestThrottle throttle;
+ /*
+ * Clients which are ready.
+ * How do we do round-robin?
+ * Have a list of clients which are ready to go, in priority order, and
+ * haven't gone this cycle.
+ * Have a list of clients which are ready to go next cycle, in priority
+ * order.
+ * Have each client track the cycle number in which it was last sent.
+ */
+ final UpdatableSortedLinkedList clientsReadyThisCycle;
+ final UpdatableSortedLinkedList clientsReadyNextCycle;
+ /** Increment every time we go through the whole list */
+ long cycleNumber;
+
+ public RequestStarter(RequestThrottle throttle, String name) {
+ clientsByPriority = new Vector();
+ clientsReadyThisCycle = new UpdatableSortedLinkedList();
+ clientsReadyNextCycle = new UpdatableSortedLinkedList();
+ cycleNumber = 0;
+ this.throttle = throttle;
+ this.name = name;
+ Thread t = new Thread(this, "Request starter");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ final String name;
+
+ public String toString() {
+ return name;
+ }
+
+ public synchronized void registerClient(RequestStarterClient client) {
+ int p = client.priority;
+ LinkedList prio = makePriority(p);
+ prio.add(client);
+ }
+
+ public synchronized void notifyReady(RequestStarterClient client) {
+ Logger.minor(this, "notifyReady("+client+")");
+ try {
+ if(client.getCycleLastSent() == cycleNumber) {
+ clientsReadyNextCycle.addOrUpdate(client);
+ } else {
+ // Can send immediately
+ clientsReadyThisCycle.addOrUpdate(client);
+ }
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ throw new Error(e);
+ }
+ notifyAll();
+ }
+
+ private synchronized LinkedList makePriority(int p) {
+ while(p >= clientsByPriority.size()) {
+ clientsByPriority.add(new LinkedList());
+ }
+ return (LinkedList) clientsByPriority.get(p);
+ }
+
+ public void run() {
+ long sentRequestTime = System.currentTimeMillis();
+ while(true) {
+ RequestStarterClient client;
+ client = getNextClient();
+ Logger.minor(this, "getNextClient() = "+client);
+ if(client != null) {
+ boolean success;
+ try {
+ success = client.send(cycleNumber);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t);
+ continue;
+ }
+ if(success) {
+ sentRequestTime =
System.currentTimeMillis();
+ Logger.minor(this, "Sent");
+ if(client.isReady()) {
+ synchronized(this) {
+ try {
+
clientsReadyNextCycle.addOrUpdate(client);
+ } catch
(UpdatableSortedLinkedListKilledException e) {
+ // Impossible
+ throw new
Error(e);
+ }
+ }
+ }
+ }
+ }
+ while(true) {
+ long delay = throttle.getDelay();
+ long sleepUntil = sentRequestTime + delay;
+ long now = System.currentTimeMillis();
+ if(sleepUntil < now) {
+ if(waitingClients()) break;
+ // Otherwise wait for notification
+ try {
+ synchronized(this) {
+ wait(1000);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ } else {
+ Logger.minor(this,
"delay="+delay+"("+throttle+") sleep for "+(sleepUntil-now)+" for "+this);
+ if(sleepUntil - now > 0)
+ try {
+ synchronized(this) {
+ // At most
sleep 500ms, then recompute.
+
wait(Math.min(sleepUntil - now, 500));
+ }
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ }
+ }
+ }
+ }
+
+ private synchronized boolean waitingClients() {
+ return !(clientsReadyThisCycle.isEmpty() &&
clientsReadyNextCycle.isEmpty());
+ }
+
+ /**
+ * Get the next ready client.
+ */
+ private synchronized RequestStarterClient getNextClient() {
+ try {
+ while(true) {
+ if(clientsReadyThisCycle.isEmpty() &&
clientsReadyNextCycle.isEmpty())
+ return null;
+ if(clientsReadyThisCycle.isEmpty()) {
+ cycleNumber++;
+
clientsReadyNextCycle.moveTo(clientsReadyThisCycle);
+ }
+ RequestStarterClient c = (RequestStarterClient)
clientsReadyThisCycle.removeLowest();
+ if(c.getCycleLastSent() == cycleNumber) {
+ clientsReadyNextCycle.add(c);
+ continue;
+ } else {
+ c.setCycleLastSet(cycleNumber);
+ return c;
+ }
+ }
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ throw new Error(e);
+ }
+ }
+}
Added: trunk/freenet/src/freenet/node/RequestStarterClient.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterClient.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/RequestStarterClient.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -0,0 +1,116 @@
+package freenet.node;
+
+import java.util.Vector;
+
+import freenet.crypt.RandomSource;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+import freenet.support.DoublyLinkedList;
+import freenet.support.UpdatableSortedLinkedListItemImpl;
+
+/**
+ * Interface to clients for starting a request.
+ * Also represents a single client for fairness purposes.
+ */
+public class RequestStarterClient extends UpdatableSortedLinkedListItemImpl {
+
+ final int priority;
+ private int random;
+ private long cycleLastSent;
+ private final Vector requests;
+ private final RandomSource rs;
+ private final QueueingSimpleLowLevelClient client;
+ private final RequestStarter starter;
+
+ public RequestStarterClient(short prioClass, short prio, RandomSource
r, QueueingSimpleLowLevelClient c, RequestStarter starter) {
+ this((prioClass << 16) + prio, r, c, starter);
+ }
+
+ private RequestStarterClient(int prio, RandomSource r,
QueueingSimpleLowLevelClient c, RequestStarter starter) {
+ priority = prio;
+ this.random = r.nextInt();
+ this.starter = starter;
+ this.cycleLastSent = -1;
+ this.requests = new Vector();
+ this.rs = r;
+ this.client = c;
+ starter.registerClient(this);
+ }
+
+ /**
+ * Blocking fetch of a key.
+ * @throws LowLevelGetException If the fetch failed for some reason.
+ */
+ public KeyBlock getKey(ClientKey key, boolean localOnly) throws
LowLevelGetException {
+ QueuedDataRequest qdr = new QueuedDataRequest(key, localOnly,
client);
+ addRequest(qdr);
+ return qdr.waitAndFetch();
+ }
+
+ /**
+ * Blocking insert of a key.
+ * @throws LowLevelPutException If the fetch failed for some reason.
+ */
+ public void putCHK(ClientCHKBlock block) throws LowLevelPutException {
+ QueuedInsertRequest qir = new QueuedInsertRequest(block,
client);
+ addRequest(qir);
+ qir.waitAndPut();
+ }
+
+ void addRequest(QueuedRequest qr) {
+ synchronized(this) {
+ requests.add(qr);
+ }
+ if(starter != null)
+ starter.notifyReady(this);
+ }
+
+ public long getCycleLastSent() {
+ return cycleLastSent;
+ }
+
+ private DoublyLinkedList parentList;
+
+ public DoublyLinkedList getParent() {
+ return parentList;
+ }
+
+ public DoublyLinkedList setParent(DoublyLinkedList l) {
+ DoublyLinkedList oldList = parentList;
+ parentList = l;
+ return oldList;
+ }
+
+ public int compareTo(Object o) {
+ if(this == o) return 0;
+ RequestStarterClient c = (RequestStarterClient) o;
+ if(priority > c.priority) return 1;
+ if(priority < c.priority) return -1;
+ if(random > c.random) return 1;
+ if(random < c.random) return -1;
+ return 0;
+ }
+
+ public synchronized boolean isReady() {
+ return !requests.isEmpty();
+ }
+
+ public boolean send(long cycleNumber) {
+ QueuedRequest qr;
+ synchronized(this) {
+ if(!requests.isEmpty()) {
+ int x = rs.nextInt(requests.size());
+ qr = (QueuedRequest) requests.remove(x);
+ } else qr = null;
+ }
+ if(qr == null) return false;
+ qr.clearToSend();
+ return true;
+ }
+
+ public void setCycleLastSet(long cycleNumber) {
+ this.cycleLastSent = cycleNumber;
+ }
+
+}
Added: trunk/freenet/src/freenet/node/RequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestThrottle.java 2005-11-16 19:29:35 UTC
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestThrottle.java 2005-11-17 20:59:49 UTC
(rev 7547)
@@ -0,0 +1,69 @@
+package freenet.node;
+
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+
+/**
+ * Keeps track of the current request send rate.
+ */
+public class RequestThrottle {
+
+ protected static final float PACKET_DROP_DECREASE_MULTIPLE = 0.97f;
+ protected static final float PACKET_TRANSMIT_INCREMENT = (4 * (1 -
(PACKET_DROP_DECREASE_MULTIPLE * PACKET_DROP_DECREASE_MULTIPLE))) / 3;
+ protected static final long MAX_DELAY = 5*60*1000;
+ protected static final long MIN_DELAY = 20;
+ public static final long DEFAULT_DELAY = 200;
+ private long _totalPackets = 0, _droppedPackets = 0;
+ private double _simulatedWindowSize = 2;
+ private final BootstrappingDecayingRunningAverage roundTripTime;
+
+ RequestThrottle(long rtt, float winSize) {
+ _simulatedWindowSize = 2;
+ roundTripTime = new BootstrappingDecayingRunningAverage(rtt,
10, 5*60*1000, 10);
+ }
+
+ /**
+ * Get the current inter-request delay.
+ */
+ public synchronized long getDelay() {
+ double rtt = roundTripTime.currentValue();
+ double winSizeForMinPacketDelay = rtt / MIN_DELAY;
+ if (_simulatedWindowSize > winSizeForMinPacketDelay) {
+ _simulatedWindowSize = winSizeForMinPacketDelay;
+ }
+ if (_simulatedWindowSize < 1.0) {
+ _simulatedWindowSize = 1.0F;
+ }
+ // return (long) (_roundTripTime / _simulatedWindowSize);
+ return Math.max(MIN_DELAY, Math.min((long) (rtt /
_simulatedWindowSize), MAX_DELAY));
+ }
+
+ /**
+ * Report that a request completed successfully, and the
+ * time it took.
+ */
+ public synchronized void requestCompleted(long time) {
+ setRoundTripTime(time);
+ _totalPackets++;
+ _simulatedWindowSize += PACKET_TRANSMIT_INCREMENT;
+ }
+
+ /**
+ * Report that a request got RejectedOverload.
+ * Do not report the time it took, because it is irrelevant.
+ */
+ public synchronized void requestRejectedOverload() {
+ _droppedPackets++;
+ _totalPackets++;
+ _simulatedWindowSize *= PACKET_DROP_DECREASE_MULTIPLE;
+ }
+
+ private synchronized void setRoundTripTime(long rtt) {
+ roundTripTime.report(Math.max(rtt, 10));
+ }
+
+ public synchronized String toString() {
+ return getDelay()+" ms, (w: "
+ + _simulatedWindowSize + ", r:" +
roundTripTime.currentValue() + ", d:"
+ + (((float) _droppedPackets / (float)
_totalPackets)) + "="+_droppedPackets+"/"+_totalPackets + ")";
+ }
+}
Modified: trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -17,10 +17,10 @@
/**
* Fetch a key. Throws if it cannot fetch it.
*/
- public KeyBlock getKey(ClientKey key, boolean localOnly) throws
LowLevelGetException;
+ public KeyBlock getKey(ClientKey key, boolean localOnly,
RequestStarterClient client) throws LowLevelGetException;
/**
* Insert a key.
*/
- public void putCHK(ClientCHKBlock key) throws LowLevelPutException;
+ public void putCHK(ClientCHKBlock key, RequestStarterClient sctx) throws
LowLevelPutException;
}
Modified: trunk/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-11-16
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-11-17
20:59:49 UTC (rev 7547)
@@ -46,14 +46,18 @@
final Node n;
final HighLevelSimpleClient client;
final Hashtable streams;
+ final RequestStarterClient requestStarterClient;
+ final RequestStarterClient insertStarterClient;
TextModeClientInterface(Node n) {
this.n = n;
- client = n.makeClient();
+ client = n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS,
(short)0);
client.addGlobalHook(new EventDumper());
this.r = n.random;
streams = new Hashtable();
- new Thread(this).start();
+ new Thread(this, "Text mode client interface").start();
+ this.requestStarterClient =
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, false);
+ this.insertStarterClient =
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, true);
}
/**
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-16 19:29:35 UTC (rev
7546)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-17 20:59:49 UTC (rev
7547)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 183;
+ public static final int buildNumber = 184;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 183;
+ public static final int lastGoodBuild = 184;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Modified: trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
===================================================================
--- trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2005-11-17 20:59:49 UTC (rev 7547)
@@ -2,6 +2,8 @@
import java.util.Enumeration;
+import freenet.node.RequestStarterClient;
+
/**
* @author amphibian
*
@@ -88,6 +90,14 @@
return item;
}
+ public synchronized void addOrUpdate(UpdatableSortedLinkedListItem
item) throws UpdatableSortedLinkedListKilledException {
+ if(item.getParent() == list)
+ update(item);
+ else if(item.getParent() == null)
+ add(item);
+ else throw new IllegalStateException("Item "+item+" should be
on our list: "+list+" or null, but is "+item.getParent());
+ }
+
public synchronized void update(UpdatableSortedLinkedListItem i) throws
UpdatableSortedLinkedListKilledException {
if(killed) throw new UpdatableSortedLinkedListKilledException();
Logger.minor(this, "Update("+i+") on "+this);
@@ -223,4 +233,20 @@
clear();
killed = true;
}
+
+ public synchronized UpdatableSortedLinkedListItem removeLowest() throws
UpdatableSortedLinkedListKilledException {
+ if(isEmpty()) return null;
+ UpdatableSortedLinkedListItem i = getLowest();
+ remove(i);
+ return i;
+ }
+
+ public synchronized void moveTo(UpdatableSortedLinkedList dest) throws
UpdatableSortedLinkedListKilledException {
+ Enumeration e = list.elements();
+ while(e.hasMoreElements()) {
+ UpdatableSortedLinkedListItem item =
(UpdatableSortedLinkedListItem) e.nextElement();
+ remove(item);
+ dest.add(item);
+ }
+ }
}