Author: toad
Date: 2006-08-12 23:06:07 +0000 (Sat, 12 Aug 2006)
New Revision: 10053
Modified:
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/RequestStarterGroup.java
Log:
Don't report round trip time (for requests) if the request hasn't got off the
node.
Save load limiting data in Node so can save all the stats as well.
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-08-12 22:53:35 UTC (rev
10052)
+++ trunk/freenet/src/freenet/node/Node.java 2006-08-12 23:06:07 UTC (rev
10053)
@@ -6,6 +6,7 @@
*/
package freenet.node;
+import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -15,6 +16,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
@@ -415,6 +417,8 @@
final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
+ final File persistTarget;
+ final File persistTemp;
// The version we were before we restarted.
public int lastVersion;
@@ -1192,7 +1196,23 @@
throw new NodeInitException(EXIT_STORE_OTHER, msg);
}
+ persistTarget = new File(nodeDir, "throttle.dat");
+ persistTemp = new File(nodeDir, "throttle.dat.tmp");
+ SimpleFieldSet throttleFS = null;
+ try {
+ throttleFS = SimpleFieldSet.readFrom(persistTarget);
+ } catch (IOException e) {
+ try {
+ throttleFS =
SimpleFieldSet.readFrom(persistTemp);
+ } catch (FileNotFoundException e1) {
+ // Ignore
+ } catch (IOException e1) {
+ Logger.error(this, "Could not read
"+persistTarget+" ("+e+") and could not read "+persistTemp+" either ("+e1+")");
+ }
+ }
+ Logger.minor(this, "Read throttleFS:\n"+throttleFS);
+
// Guesstimates. Hopefully well over the reality.
localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, Long.MAX_VALUE);
@@ -1212,7 +1232,7 @@
remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, Long.MAX_VALUE);
remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, Long.MAX_VALUE);
- clientCore = new NodeClientCore(this, config, nodeConfig,
nodeDir, portNumber, sortOrder);
+ clientCore = new NodeClientCore(this, config, nodeConfig,
nodeDir, portNumber, sortOrder, throttleFS == null ? null :
throttleFS.subset("RequestStarters"));
nodeConfig.finishedInitialization();
writeNodeFile();
@@ -1314,6 +1334,11 @@
// Process any data in the extra peer data directory
peers.readExtraPeerData();
+ ThrottlePersister persister = new ThrottlePersister();
+ Thread t = new Thread(persister, "Throttle data persister
thread");
+ t.setDaemon(true);
+ t.start();
+
hasStarted = true;
}
@@ -2723,4 +2748,80 @@
public boolean isAdvancedDarknetEnabled() {
return clientCore.isAdvancedDarknetEnabled();
}
+
+ // FIXME convert these kind of threads to Checkpointed's and implement
a handler
+ // using the PacketSender/Ticker. Would save a few threads.
+
+ class ThrottlePersister implements Runnable {
+
+ public void run() {
+ while(true) {
+ try {
+ persistThrottle();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ }
+ try {
+ Thread.sleep(60*1000);
+ } catch (InterruptedException e) {
+ // Maybe it's time to wake up?
+ }
+ }
+ }
+
+ }
+
+ public void persistThrottle() {
+ SimpleFieldSet fs = persistThrottlesToFieldSet();
+ try {
+ FileOutputStream fos = new
FileOutputStream(persistTemp);
+ // FIXME common pattern, reuse it.
+ BufferedOutputStream bos = new
BufferedOutputStream(fos);
+ OutputStreamWriter osw = new OutputStreamWriter(bos,
"UTF-8");
+ try {
+ fs.writeTo(osw);
+ } catch (IOException e) {
+ try {
+ fos.close();
+ persistTemp.delete();
+ return;
+ } catch (IOException e1) {
+ // Ignore
+ }
+ }
+ try {
+ osw.close();
+ } catch (IOException e) {
+ // Huh?
+ Logger.error(this, "Caught while closing: "+e,
e);
+ return;
+ }
+ // Try an atomic rename
+ if(!persistTemp.renameTo(persistTarget)) {
+ // Not supported on some systems (Windows)
+ if(!persistTarget.delete()) {
+ if(persistTarget.exists()) {
+ Logger.error(this, "Could not
delete "+persistTarget+" - check permissions");
+ }
+ }
+ if(!persistTemp.renameTo(persistTarget)) {
+ Logger.error(this, "Could not rename
"+persistTemp+" to "+persistTarget+" - check permissions");
+ }
+ }
+ } catch (FileNotFoundException e) {
+ Logger.error(this, "Could not store throttle data to
disk: "+e, e);
+ return;
+ } catch (UnsupportedEncodingException e) {
+ Logger.error(this, "Unsupported encoding: UTF-8 !!!!:
"+e, e);
+ }
+
+ }
+
+ private SimpleFieldSet persistThrottlesToFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet();
+ fs.put("RequestStarters",
clientCore.requestStarters.persistToFieldSet());
+ // FIXME persist the rest
+ return fs;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2006-08-12 22:53:35 UTC
(rev 10052)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2006-08-12 23:06:07 UTC
(rev 10053)
@@ -39,6 +39,7 @@
import freenet.store.KeyCollisionException;
import freenet.support.Base64;
import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
import freenet.support.io.BucketFactory;
import freenet.support.io.FilenameGenerator;
import freenet.support.io.PaddedEphemerallyEncryptedBucketFactory;
@@ -85,14 +86,15 @@
static final long MAX_ARCHIVED_FILE_SIZE = 1024*1024; // arbitrary...
FIXME
static final int MAX_CACHED_ELEMENTS = 1024; // equally arbitrary!
FIXME hopefully we can cache many of these though
- NodeClientCore(Node node, Config config, SubConfig nodeConfig, File
nodeDir, int portNumber, int sortOrder) throws NodeInitException {
+ NodeClientCore(Node node, Config config, SubConfig nodeConfig, File
nodeDir, int portNumber, int sortOrder, SimpleFieldSet throttleFS) throws
NodeInitException {
this.node = node;
this.random = node.random;
byte[] pwdBuf = new byte[16];
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
alerts = new UserAlertManager();
- requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config);
+ Logger.minor(this, "Serializing RequestStarterGroup
from:\n"+throttleFS);
+ requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS);
// Temp files
@@ -187,8 +189,6 @@
public void start(Config config) throws NodeInitException {
- requestStarters.start();
-
// TMCI
try{
TextModeClientInterfaceServer.maybeCreate(node, config);
@@ -290,10 +290,11 @@
rejectedOverload = true;
}
} else {
- if((status == RequestSender.DATA_NOT_FOUND) ||
+ if(rs.hasForwarded() &&
+ ((status ==
RequestSender.DATA_NOT_FOUND) ||
(status ==
RequestSender.SUCCESS) ||
(status ==
RequestSender.ROUTE_NOT_FOUND) ||
- (status ==
RequestSender.VERIFY_FAILURE)) {
+ (status ==
RequestSender.VERIFY_FAILURE))) {
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
requestStarters.throttleWindow.requestCompleted();
@@ -384,10 +385,11 @@
rejectedOverload = true;
}
} else {
- if((status == RequestSender.DATA_NOT_FOUND) ||
+ if(rs.hasForwarded() &&
+ ((status ==
RequestSender.DATA_NOT_FOUND) ||
(status ==
RequestSender.SUCCESS) ||
(status ==
RequestSender.ROUTE_NOT_FOUND) ||
- (status ==
RequestSender.VERIFY_FAILURE)) {
+ (status ==
RequestSender.VERIFY_FAILURE))) {
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2006-08-12 22:53:35 UTC
(rev 10052)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2006-08-12 23:06:07 UTC
(rev 10053)
@@ -58,6 +58,7 @@
private byte[] headers;
private byte[] sskData;
private SSKBlock block;
+ private boolean hasForwarded;
// Terminal status
// Always set finished AFTER setting the reason flag
@@ -150,6 +151,10 @@
next.send(req, this);
+ synchronized(this) {
+ hasForwarded = true;
+ }
+
Message msg = null;
while(true) {
@@ -569,4 +574,8 @@
return totalBytesReceived;
}
}
+
+ synchronized boolean hasForwarded() {
+ return hasForwarded;
+ }
}
Modified: trunk/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterGroup.java 2006-08-12
22:53:35 UTC (rev 10052)
+++ trunk/freenet/src/freenet/node/RequestStarterGroup.java 2006-08-12
23:06:07 UTC (rev 10053)
@@ -1,13 +1,5 @@
package freenet.node;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.UnsupportedEncodingException;
-
import freenet.client.async.ClientRequestScheduler;
import freenet.config.Config;
import freenet.config.SubConfig;
@@ -27,35 +19,15 @@
final RequestStarter sskRequestStarter;
final MyRequestThrottle sskInsertThrottle;
final RequestStarter sskInsertStarter;
- final File nodeDir;
- final File persistTarget;
- final File persistTemp;
public final ClientRequestScheduler chkFetchScheduler;
public final ClientRequestScheduler chkPutScheduler;
public final ClientRequestScheduler sskFetchScheduler;
public final ClientRequestScheduler sskPutScheduler;
- RequestStarterGroup(Node node, NodeClientCore core, int portNumber,
RandomSource random, Config config) {
+ RequestStarterGroup(Node node, NodeClientCore core, int portNumber,
RandomSource random, Config config, SimpleFieldSet fs) {
SubConfig schedulerConfig = new SubConfig("node.scheduler",
config);
- this.nodeDir = node.nodeDir;
- persistTarget = new File(nodeDir, "throttle.dat");
- persistTemp = new File(nodeDir, "throttle.dat.tmp");
-
- SimpleFieldSet fs = null;
- try {
- fs = SimpleFieldSet.readFrom(persistTarget);
- } catch (IOException e) {
- try {
- fs = SimpleFieldSet.readFrom(persistTemp);
- } catch (FileNotFoundException e1) {
- // Ignore
- } catch (IOException e1) {
- Logger.error(this, "Could not read
"+persistTarget+" ("+e+") and could not read "+persistTemp+" either ("+e1+")");
- }
- }
-
throttleWindow = new ThrottleWindowManager(2.0, fs == null ?
null : fs.subset("ThrottleWindow"));
chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request", fs == null ? null : fs.subset("CHKRequestThrottle"));
chkRequestStarter = new RequestStarter(core,
chkRequestThrottle, "CHK Request starter ("+portNumber+")",
node.requestOutputThrottle, node.requestInputThrottle,
node.localChkFetchBytesSentAverage, node.localChkFetchBytesReceivedAverage);
@@ -87,13 +59,6 @@
}
- public void start() {
- ThrottlePersister persister = new ThrottlePersister();
- Thread t = new Thread(persister, "Throttle data persister
thread");
- t.setDaemon(true);
- t.start();
- }
-
public class MyRequestThrottle implements BaseRequestThrottle {
private final BootstrappingDecayingRunningAverage
roundTripTime;
@@ -148,78 +113,10 @@
return sskInsertThrottle;
}
- // FIXME convert these kind of threads to Checkpointed's and implement
a handler
- // using the PacketSender/Ticker. Would save a few threads.
-
- class ThrottlePersister implements Runnable {
-
- public void run() {
- while(true) {
- try {
- persistThrottle();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- }
- try {
- Thread.sleep(60*1000);
- } catch (InterruptedException e) {
- // Maybe it's time to wake up?
- }
- }
- }
-
- }
-
- public void persistThrottle() {
- SimpleFieldSet fs = persistToFieldSet();
- try {
- FileOutputStream fos = new
FileOutputStream(persistTemp);
- // FIXME common pattern, reuse it.
- BufferedOutputStream bos = new
BufferedOutputStream(fos);
- OutputStreamWriter osw = new OutputStreamWriter(bos,
"UTF-8");
- try {
- fs.writeTo(osw);
- } catch (IOException e) {
- try {
- fos.close();
- persistTemp.delete();
- return;
- } catch (IOException e1) {
- // Ignore
- }
- }
- try {
- osw.close();
- } catch (IOException e) {
- // Huh?
- Logger.error(this, "Caught while closing: "+e,
e);
- return;
- }
- // Try an atomic rename
- if(!persistTemp.renameTo(persistTarget)) {
- // Not supported on some systems (Windows)
- if(!persistTarget.delete()) {
- if(persistTarget.exists()) {
- Logger.error(this, "Could not
delete "+persistTarget+" - check permissions");
- }
- }
- if(!persistTemp.renameTo(persistTarget)) {
- Logger.error(this, "Could not rename
"+persistTemp+" to "+persistTarget+" - check permissions");
- }
- }
- } catch (FileNotFoundException e) {
- Logger.error(this, "Could not store throttle data to
disk: "+e, e);
- return;
- } catch (UnsupportedEncodingException e) {
- Logger.error(this, "Unsupported encoding: UTF-8 !!!!:
"+e, e);
- }
-
- }
-
/**
* Persist the throttle data to a SimpleFieldSet.
*/
- private SimpleFieldSet persistToFieldSet() {
+ SimpleFieldSet persistToFieldSet() {
SimpleFieldSet fs = new SimpleFieldSet();
fs.put("ThrottleWindow", throttleWindow.exportFieldSet());
fs.put("CHKRequestThrottle",
chkRequestThrottle.exportFieldSet());