Author: xor
Date: 2007-12-23 10:54:03 +0000 (Sun, 23 Dec 2007)
New Revision: 16802
Modified:
trunk/freenet/src/freenet/node/NodeStats.java
Log:
- test my SVN access :)
- workaround for bug 0001554: libgcj reports infinite maximum heap memory, this
broke the free memory calculation in shouldRejectRequest()
Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java 2007-12-23 00:28:47 UTC
(rev 16801)
+++ trunk/freenet/src/freenet/node/NodeStats.java 2007-12-23 10:54:03 UTC
(rev 16802)
@@ -1,1031 +1,1035 @@
-package freenet.node;
-
-import java.io.File;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import freenet.config.InvalidConfigValueException;
-import freenet.config.SubConfig;
-import freenet.crypt.RandomSource;
-import freenet.io.comm.DMT;
-import freenet.io.comm.IOStatisticCollector;
-import freenet.l10n.L10n;
-import freenet.support.HTMLNode;
-import freenet.support.Logger;
-import freenet.support.SimpleFieldSet;
-import freenet.support.SizeUtil;
-import freenet.support.StringCounter;
-import freenet.support.TimeUtil;
-import freenet.support.TokenBucket;
-import freenet.support.api.BooleanCallback;
-import freenet.support.api.IntCallback;
-import freenet.support.api.LongCallback;
-import freenet.support.math.BootstrappingDecayingRunningAverage;
-import freenet.support.math.RunningAverage;
-import freenet.support.math.TimeDecayingRunningAverage;
-import freenet.support.math.TrivialRunningAverage;
-
-/** Node (as opposed to NodeClientCore) level statistics. Includes
shouldRejectRequest(), but not limited
- * to stuff required to implement that. */
-public class NodeStats implements Persistable {
-
- /** Sub-max ping time. If ping is greater than this, we reject some
requests. */
- public static final long SUB_MAX_PING_TIME = 700;
- /** Maximum overall average ping time. If ping is greater than this,
- * we reject all requests. */
- public static final long MAX_PING_TIME = 1500;
- /** Maximum throttled packet delay. If the throttled packet delay is
greater
- * than this, reject all packets. */
- public static final long MAX_THROTTLE_DELAY = 3000;
- /** If the throttled packet delay is less than this, reject no packets;
if it's
- * between the two, reject some packets. */
- public static final long SUB_MAX_THROTTLE_DELAY = 2000;
- /** How high can bwlimitDelayTime be before we alert (in milliseconds)*/
- public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD =
MAX_THROTTLE_DELAY*2;
- /** How high can nodeAveragePingTime be before we alert (in
milliseconds)*/
- public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD =
MAX_PING_TIME*2;
- /** How long we're over the bwlimitDelayTime threshold before we alert
(in milliseconds)*/
- public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
- /** How long we're over the nodeAveragePingTime threshold before we
alert (in milliseconds)*/
- public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
- /** Accept one request every 10 seconds regardless, to ensure we update
the
- * block send time.
- */
- public static final int MAX_INTERREQUEST_TIME = 10*1000;
-
- /** Fudge factor for high level bandwidth limiting. FIXME should be a
long term running average */
- public static final double FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS = 0.8;
-
- private final Node node;
- private MemoryChecker myMemoryChecker;
- public final PeerManager peers;
-
- final RandomSource hardRandom;
-
- private boolean logMINOR;
-
- /** first time bwlimitDelay was over PeerManagerUserAlert threshold */
- private long firstBwlimitDelayTimeThresholdBreak ;
- /** first time nodeAveragePing was over PeerManagerUserAlert threshold
*/
- private long firstNodeAveragePingTimeThresholdBreak;
- /** bwlimitDelay PeerManagerUserAlert should happen if true */
- public boolean bwlimitDelayAlertRelevant;
- /** nodeAveragePing PeerManagerUserAlert should happen if true */
- public boolean nodeAveragePingAlertRelevant;
- /** Average proportion of requests rejected immediately due to overload
*/
- public final TimeDecayingRunningAverage pInstantRejectIncoming;
-
- /** Average delay caused by throttling for sending a packet */
- final TimeDecayingRunningAverage throttledPacketSendAverage;
-
- // Bytes used by each different type of local/remote chk/ssk
request/insert
- final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
- final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
- final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
- final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
- final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
- final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
- final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
- final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
- final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
- final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
- final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
- final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
-
- // Bytes used by successful chk/ssk request/insert.
- // Note: These are used to determine whether to accept a request,
- // hence they should be roughly representative of incoming - NOT LOCAL -
- // requests. Therefore, while we DO report local successful requests,
- // we only report the portion which will be consistent with a remote
- // request. If there is both a Handler and a Sender, it's a remote
- // request, report both. If there is only a Sender, report only the
- // received bytes (for a request). Etc.
-
- // Note that these are always reported in the Handler or the
NodeClientCore
- // call taking its place.
- final TimeDecayingRunningAverage successfulChkFetchBytesSentAverage;
- final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
- final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
- final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
- final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
- final TimeDecayingRunningAverage
successfulChkInsertBytesReceivedAverage;
- final TimeDecayingRunningAverage
successfulSskInsertBytesReceivedAverage;
-
- final TrivialRunningAverage globalFetchPSuccess;
- final TrivialRunningAverage chkFetchPSuccess;
- final TrivialRunningAverage sskFetchPSuccess;
- final TrivialRunningAverage localFetchPSuccess;
- final TrivialRunningAverage remoteFetchPSuccess;
-
- File persistTarget;
- File persistTemp;
- private long previous_input_stat;
- private long previous_output_stat;
- private long previous_io_stat_time;
- private long last_input_stat;
- private long last_output_stat;
- private long last_io_stat_time;
- private final Object ioStatSync = new Object();
- /** Next time to update the node I/O stats */
- private long nextNodeIOStatsUpdateTime = -1;
- /** Node I/O stats update interval (milliseconds) */
- private static final long nodeIOStatsUpdateInterval = 2000;
-
- /** Token bucket for output bandwidth used by requests */
- final TokenBucket requestOutputThrottle;
- /** Token bucket for input bandwidth used by requests */
- final TokenBucket requestInputThrottle;
-
- // various metrics
- public final RunningAverage routingMissDistance;
- public final RunningAverage backedOffPercent;
- public final RunningAverage avgCacheLocation;
- public final RunningAverage avgStoreLocation;
- public final RunningAverage avgCacheSuccess;
- public final RunningAverage avgStoreSuccess;
- // FIXME: does furthest{Store,Cache}Success need to be synchronized?
- public double furthestCacheSuccess=0.0;
- public double furthestStoreSuccess=0.0;
- protected final Persister persister;
-
- protected final RunningAverage avgRequestLocation;
-
- // ThreadCounting stuffs
- public final ThreadGroup rootThreadGroup;
- private int threadLimit;
-
- // Free heap memory threshold stuffs
- private long freeHeapBytesThreshold;
- private int freeHeapPercentThreshold;
-
- final NodePinger nodePinger;
-
- final StringCounter preemptiveRejectReasons;
- final StringCounter localPreemptiveRejectReasons;
-
- // Enable this if you run into hard to debug OOMs.
- // Disabled to prevent long pauses every 30 seconds.
- private int aggressiveGCModificator = -1 /*250*/;
-
- // Peers stats
- /** Next time to update PeerManagerUserAlert stats */
- private long nextPeerManagerUserAlertStatsUpdateTime = -1;
- /** PeerManagerUserAlert stats update interval (milliseconds) */
- private static final long peerManagerUserAlertStatsUpdateInterval =
1000; // 1 second
-
- NodeStats(Node node, int sortOrder, SubConfig statsConfig,
SimpleFieldSet oldThrottleFS, int obwLimit, int ibwLimit) throws
NodeInitException {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- this.node = node;
- this.peers = node.peers;
- this.hardRandom = node.random;
- this.routingMissDistance = new TimeDecayingRunningAverage(0.0,
180000, 0.0, 1.0, node);
- this.backedOffPercent = new TimeDecayingRunningAverage(0.0,
180000, 0.0, 1.0, node);
- double nodeLoc=node.lm.getLocation();
- // FIXME PLEASE; (int) casts; (maxCacheKeys>MAXINT?) This value
will probably end up being a small constant anyway (200?).
- this.avgCacheLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, (int)node.maxCacheKeys,
null);
- this.avgStoreLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, (int)node.maxStoreKeys,
null);
- this.avgCacheSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
- this.avgStoreSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
- this.avgRequestLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
- preemptiveRejectReasons = new StringCounter();
- localPreemptiveRejectReasons = new StringCounter();
- pInstantRejectIncoming = new TimeDecayingRunningAverage(0,
60000, 0.0, 1.0, node);
- ThreadGroup tg = Thread.currentThread().getThreadGroup();
- while(tg.getParent() != null) tg = tg.getParent();
- this.rootThreadGroup = tg;
- throttledPacketSendAverage =
- new TimeDecayingRunningAverage(1, 10*60*1000 /* should
be significantly longer than a typical transfer */, 0, Long.MAX_VALUE, node);
- nodePinger = new NodePinger(node);
-
- previous_input_stat = 0;
- previous_output_stat = 0;
- previous_io_stat_time = 1;
- last_input_stat = 0;
- last_output_stat = 0;
- last_io_stat_time = 3;
-
- statsConfig.register("threadLimit", 500, sortOrder++, true,
true, "NodeStat.threadLimit", "NodeStat.threadLimitLong",
- new IntCallback() {
- public int get() {
- return threadLimit;
- }
- public void set(int val) throws
InvalidConfigValueException {
- if(val == get()) return;
- if(val < 100)
- throw new
InvalidConfigValueException(l10n("valueTooLow"));
- threadLimit = val;
- }
- });
- threadLimit = statsConfig.getInt("threadLimit");
-
- // Yes it could be in seconds insteed of multiples of 0.12, but
we don't want people to play with it :)
- statsConfig.register("aggressiveGC", aggressiveGCModificator,
sortOrder++, true, false, "NodeStat.aggressiveGC", "NodeStat.aggressiveGCLong",
- new IntCallback() {
- public int get() {
- return aggressiveGCModificator;
- }
- public void set(int val) throws
InvalidConfigValueException {
- if(val == get()) return;
- Logger.normal(this, "Changing
aggressiveGCModificator to "+val);
- aggressiveGCModificator = val;
- }
- });
- aggressiveGCModificator = statsConfig.getInt("aggressiveGC");
-
- myMemoryChecker = new MemoryChecker(node.ps,
aggressiveGCModificator);
- statsConfig.register("memoryChecker", true, sortOrder++, true,
false, "NodeStat.memCheck", "NodeStat.memCheckLong",
- new BooleanCallback(){
- public boolean get() {
- return
myMemoryChecker.isRunning();
- }
-
- public void set(boolean val) throws
InvalidConfigValueException {
- if(val == get()) return;
-
- if(val)
- myMemoryChecker.start();
- else
-
myMemoryChecker.terminate();
- }
- });
- if(statsConfig.getBoolean("memoryChecker"))
- myMemoryChecker.start();
-
- statsConfig.register("freeHeapBytesThreshold", "5M",
sortOrder++, true, true, "NodeStat.freeHeapBytesThreshold",
"NodeStat.freeHeapBytesThresholdLong",
- new LongCallback() {
- public long get() {
- return freeHeapBytesThreshold;
- }
- public void set(long val) throws
InvalidConfigValueException {
- if(val == get()) return;
- if(val < 0)
- throw new
InvalidConfigValueException(l10n("valueTooLow"));
- freeHeapBytesThreshold = val;
- }
- });
- freeHeapBytesThreshold =
statsConfig.getLong("freeHeapBytesThreshold");
-
- statsConfig.register("freeHeapPercentThreshold", "5",
sortOrder++, true, true, "NodeStat.freeHeapPercentThreshold",
"NodeStat.freeHeapPercentThresholdLong",
- new IntCallback() {
- public int get() {
- return freeHeapPercentThreshold;
- }
- public void set(int val) throws
InvalidConfigValueException {
- if(val == get()) return;
- if(val < 0 || val >= 100)
- throw new
InvalidConfigValueException(l10n("mustBePercentValueNotFull"));
- freeHeapPercentThreshold = val;
- }
- });
- freeHeapPercentThreshold =
statsConfig.getInt("freeHeapPercentThreshold");
-
- persister = new ConfigurablePersister(this, statsConfig,
"nodeThrottleFile", "node-throttle.dat", sortOrder++, true, false,
- "NodeStat.statsPersister",
"NodeStat.statsPersisterLong", node.ps);
-
- SimpleFieldSet throttleFS = persister.read();
-
- if(throttleFS == null)
- throttleFS = oldThrottleFS;
-
- if(logMINOR) Logger.minor(this, "Read
throttleFS:\n"+throttleFS);
-
- // Guesstimates. Hopefully well over the reality.
- localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalChkFetchBytesSentAverage"), node);
- localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalSskFetchBytesSentAverage"), node);
- localChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalChkInsertBytesSentAverage"), node);
- localSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalSskInsertBytesSentAverage"), node);
- localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesReceivedAverage"), node);
- localSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalSskFetchBytesReceivedAverage"), node);
- localChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
- localSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
-
- remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesSentAverage"), node);
- remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskFetchBytesSentAverage"), node);
- remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesSentAverage"), node);
- remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesSentAverage"), node);
- remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesReceivedAverage"), node);
- remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("RemoteSskFetchBytesReceivedAverage"), node);
- remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesReceivedAverage"), node);
- remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesReceivedAverage"), node);
-
- successfulChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesSentAverage"), node);
- successfulSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
- successfulChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
- successfulSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
- successfulChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
- successfulSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"),
node);
- successfulChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
- successfulSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
-
- globalFetchPSuccess = new TrivialRunningAverage();
- chkFetchPSuccess = new TrivialRunningAverage();
- sskFetchPSuccess = new TrivialRunningAverage();
- localFetchPSuccess = new TrivialRunningAverage();
- remoteFetchPSuccess = new TrivialRunningAverage();
-
- requestOutputThrottle =
- new TokenBucket(Math.max(obwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (obwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
- requestInputThrottle =
- new TokenBucket(Math.max(ibwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (ibwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
-
- estimatedSizeOfOneThrottledPacket = 1024 +
DMT.packetTransmitSize(1024, 32) +
- node.estimateFullHeadersLengthOneMessage();
- }
-
- protected String l10n(String key) {
- return L10n.getString("NodeStats."+key);
- }
-
- public void start() throws NodeInitException {
- nodePinger.start();
- persister.start();
-
node.getTicker().queueTimedJob(throttledPacketSendAverageIdleUpdater,
CHECK_THROTTLE_TIME);
- }
-
- /** Every 60 seconds, check whether we need to adjust the bandwidth
delay time because of idleness.
- * (If no packets have been sent, the throttledPacketSendAverage should
decrease; if it doesn't, it may go high,
- * and then no requests will be accepted, and it will stay high
forever. */
- static final int CHECK_THROTTLE_TIME = 60 * 1000;
- /** Absolute limit of 4MB queued to any given peer. FIXME make this
configurable.
- * Note that for many MessageItem's, the actual memory usage will be
significantly more than this figure. */
- private static final long MAX_PEER_QUEUE_BYTES = 4 * 1024 * 1024;
- /** Don't accept requests if it'll take more than 10 minutes to send
the current message queue */
- private static final double MAX_PEER_QUEUE_TIME = 10 * 60 * 1000.0;
-
- private long lastAcceptedRequest = -1;
-
- final int estimatedSizeOfOneThrottledPacket;
-
- final Runnable throttledPacketSendAverageIdleUpdater =
- new Runnable() {
- public void run() {
- long now = System.currentTimeMillis();
- try {
-
if(throttledPacketSendAverage.lastReportTime() < now - 5000) { // if last
report more than 5 seconds ago
- // shouldn't take long
-
node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
-
node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
- long after =
System.currentTimeMillis();
- // Report time it takes to grab
the bytes.
-
throttledPacketSendAverage.report(after - now);
- }
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- } finally {
- node.getTicker().queueTimedJob(this,
CHECK_THROTTLE_TIME);
- long end = System.currentTimeMillis();
- if(logMINOR)
- Logger.minor(this, "Throttle
check took "+TimeUtil.formatTime(end-now,2,true));
- }
- }
- };
-
- /* return reject reason as string if should reject, otherwise return
null */
- public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(logMINOR) dumpByteCostAverages();
-
- int threadCount = getActiveThreadCount();
- if(threadLimit < threadCount) {
- pInstantRejectIncoming.report(1.0);
- rejected(">threadLimit", isLocal);
- return ">threadLimit ("+threadCount+'/'+threadLimit+')';
- }
-
- double bwlimitDelayTime =
throttledPacketSendAverage.currentValue();
-
- // If no recent reports, no packets have been sent; correct the
average downwards.
- long now = System.currentTimeMillis();
- double pingTime = nodePinger.averagePingTime();
- synchronized(this) {
- // Round trip time
- if(pingTime > MAX_PING_TIME) {
- if((now - lastAcceptedRequest >
MAX_INTERREQUEST_TIME) && canAcceptAnyway) {
- if(logMINOR) Logger.minor(this,
"Accepting request anyway (take one every 10 secs to keep bwlimitDelayTime
updated)");
- } else {
- pInstantRejectIncoming.report(1.0);
- rejected(">MAX_PING_TIME", isLocal);
- return ">MAX_PING_TIME
("+TimeUtil.formatTime((long)pingTime, 2, true)+ ')';
- }
- } else if(pingTime > SUB_MAX_PING_TIME) {
- double x = ((double)(pingTime -
SUB_MAX_PING_TIME)) / (MAX_PING_TIME - SUB_MAX_PING_TIME);
- if(hardRandom.nextDouble() < x) {
- pInstantRejectIncoming.report(1.0);
- rejected(">SUB_MAX_PING_TIME", isLocal);
- return ">SUB_MAX_PING_TIME
("+TimeUtil.formatTime((long)pingTime, 2, true)+ ')';
- }
- }
-
- // Bandwidth limited packets
- if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
- if((now - lastAcceptedRequest >
MAX_INTERREQUEST_TIME) && canAcceptAnyway) {
- if(logMINOR) Logger.minor(this,
"Accepting request anyway (take one every 10 secs to keep bwlimitDelayTime
updated)");
- } else {
- pInstantRejectIncoming.report(1.0);
- rejected(">MAX_THROTTLE_DELAY",
isLocal);
- return ">MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
- }
- } else if(bwlimitDelayTime > SUB_MAX_THROTTLE_DELAY) {
- double x = ((double)(bwlimitDelayTime -
SUB_MAX_THROTTLE_DELAY)) / (MAX_THROTTLE_DELAY - SUB_MAX_THROTTLE_DELAY);
- if(hardRandom.nextDouble() < x) {
- pInstantRejectIncoming.report(1.0);
- rejected(">SUB_MAX_THROTTLE_DELAY",
isLocal);
- return ">SUB_MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
- }
- }
-
- }
-
- // Successful cluster timeout protection.
- // Reject request if the result of all our current requests
completing simultaneously would be that
- // some of them timeout.
-
- // Never reject a CHK and accept an SSK. Because if we do that,
we would be constantly accepting SSKs, as there
- // would never be enough space for a CHK. So we add 1 to each
type of request's count before computing the
- // bandwidth liability. Thus, if we have exactly enough space
for 1 SSK and 1 CHK, we can accept either, and
- // when one of either type completes, we can accept one of
either type again: We never let SSKs drain the
- // "bucket" and block CHKs.
-
- int numCHKRequests = node.getNumCHKRequests() + 1;
- int numSSKRequests = node.getNumSSKRequests() + 1;
- int numCHKInserts = node.getNumCHKInserts() + 1;
- int numSSKInserts = node.getNumSSKInserts() + 1;
- if(logMINOR)
- Logger.minor(this, "Running (adjusted): CHK fetch
"+numCHKRequests+" SSK fetch "+numSSKRequests+" CHK insert "+numCHKInserts+"
SSK insert "+numSSKInserts);
-
- double bandwidthLiabilityOutput =
- successfulChkFetchBytesSentAverage.currentValue() *
numCHKRequests +
- successfulSskFetchBytesSentAverage.currentValue() *
numSSKRequests +
- successfulChkInsertBytesSentAverage.currentValue() *
numCHKInserts +
- successfulSskInsertBytesSentAverage.currentValue() *
numSSKInserts;
- double bandwidthAvailableOutput =
- node.getOutputBandwidthLimit() * 90; // 90 seconds at
full power; we have to leave some time for the search as well
- bandwidthAvailableOutput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
- if(bandwidthLiabilityOutput > bandwidthAvailableOutput) {
- pInstantRejectIncoming.report(1.0);
- rejected("Output bandwidth liability", isLocal);
- return "Output bandwidth liability
("+bandwidthLiabilityOutput+" > "+bandwidthAvailableOutput+")";
- }
-
- double bandwidthLiabilityInput =
- successfulChkFetchBytesReceivedAverage.currentValue() *
numCHKRequests +
- successfulSskFetchBytesReceivedAverage.currentValue() *
numSSKRequests +
- successfulChkInsertBytesReceivedAverage.currentValue()
* numCHKInserts +
- successfulSskInsertBytesReceivedAverage.currentValue()
* numSSKInserts;
- double bandwidthAvailableInput =
- node.getInputBandwidthLimit() * 90; // 90 seconds at
full power
- bandwidthAvailableInput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
- if(bandwidthLiabilityInput > bandwidthAvailableInput) {
- pInstantRejectIncoming.report(1.0);
- rejected("Input bandwidth liability", isLocal);
- return "Input bandwidth liability
("+bandwidthLiabilityInput+" > "+bandwidthAvailableInput+")";
- }
-
-
- // Do we have the bandwidth?
- double expected = this.getThrottle(isLocal, isInsert, isSSK,
true).currentValue();
- int expectedSent = (int)Math.max(expected, 0);
- if(logMINOR)
- Logger.minor(this, "Expected sent bytes:
"+expectedSent);
- if(!requestOutputThrottle.instantGrab(expectedSent)) {
- pInstantRejectIncoming.report(1.0);
- rejected("Insufficient output bandwidth", isLocal);
- return "Insufficient output bandwidth";
- }
- expected = this.getThrottle(isLocal, isInsert, isSSK,
false).currentValue();
- int expectedReceived = (int)Math.max(expected, 0);
- if(logMINOR)
- Logger.minor(this, "Expected received bytes:
"+expectedSent);
- if(!requestInputThrottle.instantGrab(expectedReceived)) {
- requestOutputThrottle.recycle(expectedSent);
- pInstantRejectIncoming.report(1.0);
- rejected("Insufficient input bandwidth", isLocal);
- return "Insufficient input bandwidth";
- }
-
- Runtime r = Runtime.getRuntime();
- long maxHeapMemory = r.maxMemory();
- long totalHeapMemory = r.totalMemory();
- long freeHeapMemory = r.freeMemory();
- if(maxHeapMemory < Long.MAX_VALUE) { // would mean unlimited
- freeHeapMemory = maxHeapMemory - (totalHeapMemory -
freeHeapMemory);
- }
- if(freeHeapMemory < freeHeapBytesThreshold) {
- pInstantRejectIncoming.report(1.0);
- rejected("<freeHeapBytesThreshold", isLocal);
- return "<freeHeapBytesThreshold
("+SizeUtil.formatSize(freeHeapMemory, false)+" of
"+SizeUtil.formatSize(maxHeapMemory, false)+')';
- }
- double percentFreeHeapMemoryOfMax = ((double) freeHeapMemory) /
((double) maxHeapMemory);
- double freeHeapPercentThresholdDouble = ((double)
freeHeapPercentThreshold) / ((double) 100);
- if(percentFreeHeapMemoryOfMax < freeHeapPercentThresholdDouble)
{
- pInstantRejectIncoming.report(1.0);
- DecimalFormat fix3p1pct = new DecimalFormat("##0.0%");
- rejected("<freeHeapPercentThreshold", isLocal);
- return "<freeHeapPercentThreshold
("+SizeUtil.formatSize(freeHeapMemory, false)+" of
"+SizeUtil.formatSize(maxHeapMemory, false)+"
("+fix3p1pct.format(percentFreeHeapMemoryOfMax)+"))";
- }
-
- if(source != null) {
- long queuedBytes = source.getMessageQueueLengthBytes();
- if(queuedBytes > MAX_PEER_QUEUE_BYTES)
- return "Too many message bytes queued for peer";
- if(queuedBytes /
(source.getThrottle().getBandwidth()+1.0) > MAX_PEER_QUEUE_TIME)
- return "Peer's queue will take too long to
transfer";
- }
-
- synchronized(this) {
- if(logMINOR) Logger.minor(this, "Accepting request?");
- lastAcceptedRequest = now;
- }
-
- pInstantRejectIncoming.report(0.0);
-
- // Accept
- return null;
- }
-
- private void rejected(String reason, boolean isLocal) {
- if(!isLocal) preemptiveRejectReasons.inc(reason);
- else this.localPreemptiveRejectReasons.inc(reason);
- }
-
- private RunningAverage getThrottle(boolean isLocal, boolean isInsert,
boolean isSSK, boolean isSent) {
- if(isLocal) {
- if(isInsert) {
- if(isSSK) {
- return isSent ?
this.localSskInsertBytesSentAverage : this.localSskInsertBytesReceivedAverage;
- } else {
- return isSent ?
this.localChkInsertBytesSentAverage : this.localChkInsertBytesReceivedAverage;
- }
- } else {
- if(isSSK) {
- return isSent ?
this.localSskFetchBytesSentAverage : this.localSskFetchBytesReceivedAverage;
- } else {
- return isSent ?
this.localChkFetchBytesSentAverage : this.localChkFetchBytesReceivedAverage;
- }
- }
- } else {
- if(isInsert) {
- if(isSSK) {
- return isSent ?
this.remoteSskInsertBytesSentAverage : this.remoteSskInsertBytesReceivedAverage;
- } else {
- return isSent ?
this.remoteChkInsertBytesSentAverage : this.remoteChkInsertBytesReceivedAverage;
- }
- } else {
- if(isSSK) {
- return isSent ?
this.remoteSskFetchBytesSentAverage : this.remoteSskFetchBytesReceivedAverage;
- } else {
- return isSent ?
this.remoteChkFetchBytesSentAverage : this.remoteChkFetchBytesReceivedAverage;
- }
- }
- }
- }
-
- private void dumpByteCostAverages() {
- Logger.minor(this, "Byte cost averages: REMOTE:"+
- " CHK insert
"+remoteChkInsertBytesSentAverage.currentValue()+ '/'
+remoteChkInsertBytesReceivedAverage.currentValue()+
- " SSK insert
"+remoteSskInsertBytesSentAverage.currentValue()+ '/'
+remoteSskInsertBytesReceivedAverage.currentValue()+
- " CHK fetch
"+remoteChkFetchBytesSentAverage.currentValue()+ '/'
+remoteChkFetchBytesReceivedAverage.currentValue()+
- " SSK fetch
"+remoteSskFetchBytesSentAverage.currentValue()+ '/'
+remoteSskFetchBytesReceivedAverage.currentValue());
- Logger.minor(this, "Byte cost averages: LOCAL:"+
- " CHK insert
"+localChkInsertBytesSentAverage.currentValue()+ '/'
+localChkInsertBytesReceivedAverage.currentValue()+
- " SSK insert
"+localSskInsertBytesSentAverage.currentValue()+ '/'
+localSskInsertBytesReceivedAverage.currentValue()+
- " CHK fetch
"+localChkFetchBytesSentAverage.currentValue()+ '/'
+localChkFetchBytesReceivedAverage.currentValue()+
- " SSK fetch
"+localSskFetchBytesSentAverage.currentValue()+ '/'
+localSskFetchBytesReceivedAverage.currentValue());
- Logger.minor(this, "Byte cost averages: SUCCESSFUL:"+
- " CHK insert
"+successfulChkInsertBytesSentAverage.currentValue()+ '/'
+successfulChkInsertBytesReceivedAverage.currentValue()+
- " SSK insert
"+successfulSskInsertBytesSentAverage.currentValue()+ '/'
+successfulSskInsertBytesReceivedAverage.currentValue()+
- " CHK fetch
"+successfulChkFetchBytesSentAverage.currentValue()+ '/'
+successfulChkFetchBytesReceivedAverage.currentValue()+
- " SSK fetch
"+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue());
-
- }
-
- public double getBwlimitDelayTime() {
- return throttledPacketSendAverage.currentValue();
- }
-
- public double getNodeAveragePingTime() {
- return nodePinger.averagePingTime();
- }
-
- public int getNetworkSizeEstimate(long timestamp) {
- return node.lm.getNetworkSizeEstimate( timestamp );
- }
-
- public Object[] getKnownLocations(long timestamp) {
- return node.lm.getKnownLocations( timestamp );
- }
-
- public double pRejectIncomingInstantly() {
- return pInstantRejectIncoming.currentValue();
- }
-
- /**
- * Update peerManagerUserAlertStats if the timer has expired.
- * Only called from PacketSender so doesn't need sync.
- */
- public void maybeUpdatePeerManagerUserAlertStats(long now) {
- if(now > nextPeerManagerUserAlertStatsUpdateTime) {
- if(getBwlimitDelayTime() >
MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD) {
- if(firstBwlimitDelayTimeThresholdBreak == 0) {
- firstBwlimitDelayTimeThresholdBreak =
now;
- }
- } else {
- firstBwlimitDelayTimeThresholdBreak = 0;
- }
- if((firstBwlimitDelayTimeThresholdBreak != 0) && ((now
- firstBwlimitDelayTimeThresholdBreak) >= MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY)) {
- bwlimitDelayAlertRelevant = true;
- } else {
- bwlimitDelayAlertRelevant = false;
- }
- if(getNodeAveragePingTime() >
MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD) {
- if(firstNodeAveragePingTimeThresholdBreak == 0)
{
- firstNodeAveragePingTimeThresholdBreak
= now;
- }
- } else {
- firstNodeAveragePingTimeThresholdBreak = 0;
- }
- if((firstNodeAveragePingTimeThresholdBreak != 0) &&
((now - firstNodeAveragePingTimeThresholdBreak) >=
MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY)) {
- nodeAveragePingAlertRelevant = true;
- } else {
- nodeAveragePingAlertRelevant = false;
- }
- if(logMINOR && Logger.shouldLog(Logger.DEBUG, this))
Logger.debug(this, "mUPMUAS: "+now+": "+getBwlimitDelayTime()+" >?
"+MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD+" since
"+firstBwlimitDelayTimeThresholdBreak+" ("+bwlimitDelayAlertRelevant+")
"+getNodeAveragePingTime()+" >? "+MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD+"
since "+firstNodeAveragePingTimeThresholdBreak+"
("+nodeAveragePingAlertRelevant+ ')');
- nextPeerManagerUserAlertStatsUpdateTime = now +
peerManagerUserAlertStatsUpdateInterval;
- }
- }
-
- public SimpleFieldSet persistThrottlesToFieldSet() {
- SimpleFieldSet fs = new SimpleFieldSet(true);
- fs.put("RemoteChkFetchBytesSentAverage",
remoteChkFetchBytesSentAverage.exportFieldSet(true));
- fs.put("RemoteSskFetchBytesSentAverage",
remoteSskFetchBytesSentAverage.exportFieldSet(true));
- fs.put("RemoteChkInsertBytesSentAverage",
remoteChkInsertBytesSentAverage.exportFieldSet(true));
- fs.put("RemoteSskInsertBytesSentAverage",
remoteSskInsertBytesSentAverage.exportFieldSet(true));
- fs.put("RemoteChkFetchBytesReceivedAverage",
remoteChkFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("RemoteSskFetchBytesReceivedAverage",
remoteSskFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("RemoteChkInsertBytesReceivedAverage",
remoteChkInsertBytesReceivedAverage.exportFieldSet(true));
- fs.put("RemoteSskInsertBytesReceivedAverage",
remoteSskInsertBytesReceivedAverage.exportFieldSet(true));
- fs.put("LocalChkFetchBytesSentAverage",
localChkFetchBytesSentAverage.exportFieldSet(true));
- fs.put("LocalSskFetchBytesSentAverage",
localSskFetchBytesSentAverage.exportFieldSet(true));
- fs.put("LocalChkInsertBytesSentAverage",
localChkInsertBytesSentAverage.exportFieldSet(true));
- fs.put("LocalSskInsertBytesSentAverage",
localSskInsertBytesSentAverage.exportFieldSet(true));
- fs.put("LocalChkFetchBytesReceivedAverage",
localChkFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("LocalSskFetchBytesReceivedAverage",
localSskFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("LocalChkInsertBytesReceivedAverage",
localChkInsertBytesReceivedAverage.exportFieldSet(true));
- fs.put("LocalSskInsertBytesReceivedAverage",
localSskInsertBytesReceivedAverage.exportFieldSet(true));
- fs.put("SuccessfulChkFetchBytesSentAverage",
successfulChkFetchBytesSentAverage.exportFieldSet(true));
- fs.put("SuccessfulSskFetchBytesSentAverage",
successfulSskFetchBytesSentAverage.exportFieldSet(true));
- fs.put("SuccessfulChkInsertBytesSentAverage",
successfulChkInsertBytesSentAverage.exportFieldSet(true));
- fs.put("SuccessfulSskInsertBytesSentAverage",
successfulSskInsertBytesSentAverage.exportFieldSet(true));
- fs.put("SuccessfulChkFetchBytesReceivedAverage",
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("SuccessfulSskFetchBytesReceivedAverage",
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
- fs.put("SuccessfulChkInsertBytesReceivedAverage",
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
- fs.put("SuccessfulSskInsertBytesReceivedAverage",
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
- return fs;
- }
-
- /**
- * Update the node-wide bandwidth I/O stats if the timer has expired
- */
- public void maybeUpdateNodeIOStats(long now) {
- if(now > nextNodeIOStatsUpdateTime) {
- long[] io_stats = IOStatisticCollector.getTotalIO();
- long outdiff;
- long indiff;
- synchronized(ioStatSync) {
- previous_output_stat = last_output_stat;
- previous_input_stat = last_input_stat;
- previous_io_stat_time = last_io_stat_time;
- last_output_stat = io_stats[ 0 ];
- last_input_stat = io_stats[ 1 ];
- last_io_stat_time = now;
- outdiff = last_output_stat -
previous_output_stat;
- indiff = last_input_stat - previous_input_stat;
- }
- if(logMINOR)
- Logger.minor(this, "Last 2 seconds: input:
"+indiff+" output: "+outdiff);
- nextNodeIOStatsUpdateTime = now +
nodeIOStatsUpdateInterval;
- }
- }
-
- public long[] getNodeIOStats() {
- long[] result = new long[6];
- synchronized(ioStatSync) {
- result[ 0 ] = previous_output_stat;
- result[ 1 ] = previous_input_stat;
- result[ 2 ] = previous_io_stat_time;
- result[ 3 ] = last_output_stat;
- result[ 4 ] = last_input_stat;
- result[ 5 ] = last_io_stat_time;
- }
- return result;
- }
-
- public void waitUntilNotOverloaded(boolean isInsert) {
- while(threadLimit < getActiveThreadCount()){
- try{
- Thread.sleep(5000);
- } catch (InterruptedException e) {}
- }
- }
-
- public int getActiveThreadCount() {
- return rootThreadGroup.activeCount() -
node.executor.waitingThreads();
- }
-
- public int getThreadLimit() {
- return threadLimit;
- }
-
- public SimpleFieldSet exportVolatileFieldSet() {
- SimpleFieldSet fs = new SimpleFieldSet(true);
- long now = System.currentTimeMillis();
- fs.put("isUsingWrapper", node.isUsingWrapper());
- long nodeUptimeSeconds = 0;
- synchronized(this) {
- fs.put("startupTime", node.startupTime);
- nodeUptimeSeconds = (now - node.startupTime) / 1000;
- fs.put("uptimeSeconds", nodeUptimeSeconds);
- }
- fs.put("averagePingTime", getNodeAveragePingTime());
- fs.put("bwlimitDelayTime", getBwlimitDelayTime());
- fs.put("networkSizeEstimateSession",
getNetworkSizeEstimate(-1));
- int networkSizeEstimate24hourRecent =
getNetworkSizeEstimate(now - (24*60*60*1000)); // 24 hours
- fs.put("networkSizeEstimate24hourRecent",
networkSizeEstimate24hourRecent);
- int networkSizeEstimate48hourRecent =
getNetworkSizeEstimate(now - (48*60*60*1000)); // 48 hours
- fs.put("networkSizeEstimate48hourRecent",
networkSizeEstimate48hourRecent);
- fs.put("routingMissDistance",
routingMissDistance.currentValue());
- fs.put("backedOffPercent", backedOffPercent.currentValue());
- fs.put("pInstantReject", pRejectIncomingInstantly());
- fs.put("unclaimedFIFOSize", node.usm.getUnclaimedFIFOSize());
-
- /* gather connection statistics */
- DarknetPeerNodeStatus[] peerNodeStatuses =
peers.getDarknetPeerNodeStatuses();
- Arrays.sort(peerNodeStatuses, new Comparator() {
- public int compare(Object first, Object second) {
- DarknetPeerNodeStatus firstNode =
(DarknetPeerNodeStatus) first;
- DarknetPeerNodeStatus secondNode =
(DarknetPeerNodeStatus) second;
- int statusDifference =
firstNode.getStatusValue() - secondNode.getStatusValue();
- if (statusDifference != 0) {
- return statusDifference;
- }
- return
firstNode.getName().compareToIgnoreCase(secondNode.getName());
- }
- });
-
- int numberOfConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_CONNECTED);
- int numberOfRoutingBackedOff =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF);
- int numberOfTooNew =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_NEW);
- int numberOfTooOld =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_OLD);
- int numberOfDisconnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISCONNECTED);
- int numberOfNeverConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_NEVER_CONNECTED);
- int numberOfDisabled =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISABLED);
- int numberOfBursting =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_BURSTING);
- int numberOfListening =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTENING);
- int numberOfListenOnly =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTEN_ONLY);
-
- int numberOfSimpleConnected = numberOfConnected +
numberOfRoutingBackedOff;
- int numberOfNotConnected = numberOfTooNew + numberOfTooOld +
numberOfDisconnected + numberOfNeverConnected + numberOfDisabled +
numberOfBursting + numberOfListening + numberOfListenOnly;
-
- fs.put("numberOfConnected", numberOfConnected);
- fs.put("numberOfRoutingBackedOff", numberOfRoutingBackedOff);
- fs.put("numberOfTooNew", numberOfTooNew);
- fs.put("numberOfTooOld", numberOfTooOld);
- fs.put("numberOfDisconnected", numberOfDisconnected);
- fs.put("numberOfNeverConnected", numberOfNeverConnected);
- fs.put("numberOfDisabled", numberOfDisabled);
- fs.put("numberOfBursting", numberOfBursting);
- fs.put("numberOfListening", numberOfListening);
- fs.put("numberOfListenOnly", numberOfListenOnly);
-
- fs.put("numberOfSimpleConnected", numberOfSimpleConnected);
- fs.put("numberOfNotConnected", numberOfNotConnected);
-
- fs.put("numberOfInsertSenders", node.getNumInsertSenders());
- fs.put("numberOfRequestSenders", node.getNumRequestSenders());
- fs.put("numberOfTransferringRequestSenders",
node.getNumTransferringRequestSenders());
- fs.put("numberOfARKFetchers", node.getNumARKFetchers());
-
- long[] total = IOStatisticCollector.getTotalIO();
- long total_output_rate = (total[0]) / nodeUptimeSeconds;
- long total_input_rate = (total[1]) / nodeUptimeSeconds;
- long totalPayloadOutput = node.getTotalPayloadSent();
- long total_payload_output_rate = totalPayloadOutput /
nodeUptimeSeconds;
- int total_payload_output_percent = (int) (100 *
totalPayloadOutput / total[0]);
- fs.put("totalOutputBytes", total[0]);
- fs.put("totalOutputRate", total_output_rate);
- fs.put("totalPayloadOutputBytes", totalPayloadOutput);
- fs.put("totalPayloadOutputRate", total_payload_output_rate);
- fs.put("totalPayloadOutputPercent",
total_payload_output_percent);
- fs.put("totalInputBytes", total[1]);
- fs.put("totalInputRate", total_input_rate);
-
- long[] rate = getNodeIOStats();
- long deltaMS = (rate[5] - rate[2]);
- double recent_output_rate = 1000.0 * (rate[3] - rate[0]) /
deltaMS;
- double recent_input_rate = 1000.0 * (rate[4] - rate[1]) /
deltaMS;
- fs.put("recentOutputRate", recent_output_rate);
- fs.put("recentInputRate", recent_input_rate);
-
- String [] routingBackoffReasons =
peers.getPeerNodeRoutingBackoffReasons();
- if(routingBackoffReasons.length != 0) {
- for(int i=0;i<routingBackoffReasons.length;i++) {
- fs.put("numberWithRoutingBackoffReasons." +
routingBackoffReasons[i],
peers.getPeerNodeRoutingBackoffReasonSize(routingBackoffReasons[i]));
- }
- }
-
- double swaps = (double)node.getSwaps();
- double noSwaps = (double)node.getNoSwaps();
- double numberOfRemotePeerLocationsSeenInSwaps =
(double)node.getNumberOfRemotePeerLocationsSeenInSwaps();
- fs.putSingle("numberOfRemotePeerLocationsSeenInSwaps",
Double.toString(numberOfRemotePeerLocationsSeenInSwaps));
- double avgConnectedPeersPerNode = 0.0;
- if ((numberOfRemotePeerLocationsSeenInSwaps > 0.0) && ((swaps >
0.0) || (noSwaps > 0.0))) {
- avgConnectedPeersPerNode =
numberOfRemotePeerLocationsSeenInSwaps/(swaps+noSwaps);
- }
- fs.putSingle("avgConnectedPeersPerNode",
Double.toString(avgConnectedPeersPerNode));
-
- int startedSwaps = node.getStartedSwaps();
- int swapsRejectedAlreadyLocked =
node.getSwapsRejectedAlreadyLocked();
- int swapsRejectedNowhereToGo =
node.getSwapsRejectedNowhereToGo();
- int swapsRejectedRateLimit = node.getSwapsRejectedRateLimit();
- int swapsRejectedLoop = node.getSwapsRejectedLoop();
- int swapsRejectedRecognizedID =
node.getSwapsRejectedRecognizedID();
- double locationChangePerSession =
node.getLocationChangeSession();
- double locationChangePerSwap = 0.0;
- double locationChangePerMinute = 0.0;
- double swapsPerMinute = 0.0;
- double noSwapsPerMinute = 0.0;
- double swapsPerNoSwaps = 0.0;
- if (swaps > 0) {
- locationChangePerSwap = locationChangePerSession/swaps;
- }
- if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
- locationChangePerMinute =
locationChangePerSession/(double)(nodeUptimeSeconds/60.0);
- }
- if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
- swapsPerMinute = swaps/(double)(nodeUptimeSeconds/60.0);
- }
- if ((noSwaps > 0.0) && (nodeUptimeSeconds >= 60)) {
- noSwapsPerMinute =
noSwaps/(double)(nodeUptimeSeconds/60.0);
- }
- if ((swaps > 0.0) && (noSwaps > 0.0)) {
- swapsPerNoSwaps = swaps/noSwaps;
- }
- fs.put("locationChangePerSession", locationChangePerSession);
- fs.put("locationChangePerSwap", locationChangePerSwap);
- fs.put("locationChangePerMinute", locationChangePerMinute);
- fs.put("swapsPerMinute", swapsPerMinute);
- fs.put("noSwapsPerMinute", noSwapsPerMinute);
- fs.put("swapsPerNoSwaps", swapsPerNoSwaps);
- fs.put("swaps", swaps);
- fs.put("noSwaps", noSwaps);
- fs.put("startedSwaps", startedSwaps);
- fs.put("swapsRejectedAlreadyLocked",
swapsRejectedAlreadyLocked);
- fs.put("swapsRejectedNowhereToGo", swapsRejectedNowhereToGo);
- fs.put("swapsRejectedRateLimit", swapsRejectedRateLimit);
- fs.put("swapsRejectedLoop", swapsRejectedLoop);
- fs.put("swapsRejectedRecognizedID", swapsRejectedRecognizedID);
- long fix32kb = 32 * 1024;
- long cachedKeys = node.getChkDatacache().keyCount();
- long cachedSize = cachedKeys * fix32kb;
- long storeKeys = node.getChkDatastore().keyCount();
- long storeSize = storeKeys * fix32kb;
- long overallKeys = cachedKeys + storeKeys;
- long overallSize = cachedSize + storeSize;
-
- long maxOverallKeys = node.getMaxTotalKeys();
- long maxOverallSize = maxOverallKeys * fix32kb;
-
- double percentOverallKeysOfMax =
(double)(overallKeys*100)/(double)maxOverallKeys;
-
- long cachedStoreHits = node.getChkDatacache().hits();
- long cachedStoreMisses = node.getChkDatacache().misses();
- long cacheAccesses = cachedStoreHits + cachedStoreMisses;
- double percentCachedStoreHitsOfAccesses =
(double)(cachedStoreHits*100) / (double)cacheAccesses;
- long storeHits = node.getChkDatastore().hits();
- long storeMisses = node.getChkDatastore().misses();
- long storeAccesses = storeHits + storeMisses;
- double percentStoreHitsOfAccesses = (double)(storeHits*100) /
(double)storeAccesses;
- long overallAccesses = storeAccesses + cacheAccesses;
- double avgStoreAccessRate =
(double)overallAccesses/(double)nodeUptimeSeconds;
-
- fs.put("cachedKeys", cachedKeys);
- fs.put("cachedSize", cachedSize);
- fs.put("storeKeys", storeKeys);
- fs.put("storeSize", storeSize);
- fs.put("overallKeys", overallKeys);
- fs.put("overallSize", overallSize);
- fs.put("maxOverallKeys", maxOverallKeys);
- fs.put("maxOverallSize", maxOverallSize);
- fs.put("percentOverallKeysOfMax", percentOverallKeysOfMax);
- fs.put("cachedStoreHits", cachedStoreHits);
- fs.put("cachedStoreMisses", cachedStoreMisses);
- fs.put("cacheAccesses", cacheAccesses);
- fs.put("percentCachedStoreHitsOfAccesses",
percentCachedStoreHitsOfAccesses);
- fs.put("storeHits", storeHits);
- fs.put("storeMisses", storeMisses);
- fs.put("storeAccesses", storeAccesses);
- fs.put("percentStoreHitsOfAccesses",
percentStoreHitsOfAccesses);
- fs.put("overallAccesses", overallAccesses);
- fs.put("avgStoreAccessRate", avgStoreAccessRate);
-
- Runtime rt = Runtime.getRuntime();
- float freeMemory = (float) rt.freeMemory();
- float totalMemory = (float) rt.totalMemory();
- float maxMemory = (float) rt.maxMemory();
-
- long usedJavaMem = (long)(totalMemory - freeMemory);
- long allocatedJavaMem = (long)totalMemory;
- long maxJavaMem = (long)maxMemory;
- int availableCpus = rt.availableProcessors();
-
- fs.put("freeJavaMemory", (long)freeMemory);
- fs.put("usedJavaMemory", usedJavaMem);
- fs.put("allocatedJavaMemory", allocatedJavaMem);
- fs.put("maximumJavaMemory", maxJavaMem);
- fs.put("availableCPUs", availableCpus);
- fs.put("runningThreadCount", getActiveThreadCount());
-
- return fs;
- }
-
- public void setOutputLimit(int obwLimit) {
-
requestOutputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(obwLimit*60,
32768*20));
- if(node.inputLimitDefault) {
- setInputLimit(obwLimit * 4);
- }
- }
-
- public void setInputLimit(int ibwLimit) {
-
requestInputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(ibwLimit*60,
32768*20));
- }
-
- public boolean isTestnetEnabled() {
- return node.isTestnetEnabled();
- }
-
- public boolean getRejectReasonsTable(HTMLNode table) {
- return preemptiveRejectReasons.toTableRows(table) > 0;
- }
-
- public boolean getLocalRejectReasonsTable(HTMLNode table) {
- return localPreemptiveRejectReasons.toTableRows(table) > 0;
- }
-
- public void requestCompleted(boolean succeeded, boolean isRemote,
boolean isSSK) {
- globalFetchPSuccess.report(succeeded ? 1.0 : 0.0);
- if(isSSK)
- sskFetchPSuccess.report(succeeded ? 1.0 : 0.0);
- else
- chkFetchPSuccess.report(succeeded ? 1.0 : 0.0);
- if(isRemote)
- remoteFetchPSuccess.report(succeeded ? 1.0 : 0.0);
- else
- localFetchPSuccess.report(succeeded ? 1.0 : 0.0);
- }
-
- private final DecimalFormat fix3p3pct = new DecimalFormat("##0.000%");
- private final NumberFormat thousendPoint = NumberFormat.getInstance();
-
- public void fillSuccessRateBox(HTMLNode parent) {
- HTMLNode list = parent.addChild("table", "border", "0");
- final TrivialRunningAverage[] averages = new
TrivialRunningAverage[] {
- globalFetchPSuccess,
- chkFetchPSuccess,
- sskFetchPSuccess,
- localFetchPSuccess,
- remoteFetchPSuccess
- };
- final String[] names = new String[] {
- // FIXME l10n, but atm this only shows up in
advanced mode
- "All requests",
- "CHKs",
- "SSKs",
- "Local requests",
- "Remote requests"
- };
- HTMLNode row = list.addChild("tr");
- row.addChild("th", "Group");
- row.addChild("th", "P(success)");
- row.addChild("th", "Count");
-
- for(int i=0;i<averages.length;i++) {
- row = list.addChild("tr");
- row.addChild("td", names[i]);
- if (averages[i].countReports()==0) {
- row.addChild("td", "-");
- row.addChild("td", "0");
- } else {
- row.addChild("td",
fix3p3pct.format(averages[i].currentValue()));
- row.addChild("td",
thousendPoint.format(averages[i].countReports()));
- }
- }
- }
-
-}
+package freenet.node;
+
+import java.io.File;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import freenet.config.InvalidConfigValueException;
+import freenet.config.SubConfig;
+import freenet.crypt.RandomSource;
+import freenet.io.comm.DMT;
+import freenet.io.comm.IOStatisticCollector;
+import freenet.l10n.L10n;
+import freenet.support.HTMLNode;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+import freenet.support.SizeUtil;
+import freenet.support.StringCounter;
+import freenet.support.TimeUtil;
+import freenet.support.TokenBucket;
+import freenet.support.api.BooleanCallback;
+import freenet.support.api.IntCallback;
+import freenet.support.api.LongCallback;
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+import freenet.support.math.RunningAverage;
+import freenet.support.math.TimeDecayingRunningAverage;
+import freenet.support.math.TrivialRunningAverage;
+
+/** Node (as opposed to NodeClientCore) level statistics. Includes
shouldRejectRequest(), but not limited
+ * to stuff required to implement that. */
+public class NodeStats implements Persistable {
+
+ /** Sub-max ping time. If ping is greater than this, we reject some
requests. */
+ public static final long SUB_MAX_PING_TIME = 700;
+ /** Maximum overall average ping time. If ping is greater than this,
+ * we reject all requests. */
+ public static final long MAX_PING_TIME = 1500;
+ /** Maximum throttled packet delay. If the throttled packet delay is
greater
+ * than this, reject all packets. */
+ public static final long MAX_THROTTLE_DELAY = 3000;
+ /** If the throttled packet delay is less than this, reject no packets;
if it's
+ * between the two, reject some packets. */
+ public static final long SUB_MAX_THROTTLE_DELAY = 2000;
+ /** How high can bwlimitDelayTime be before we alert (in milliseconds)*/
+ public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD =
MAX_THROTTLE_DELAY*2;
+ /** How high can nodeAveragePingTime be before we alert (in
milliseconds)*/
+ public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD =
MAX_PING_TIME*2;
+ /** How long we're over the bwlimitDelayTime threshold before we alert
(in milliseconds)*/
+ public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
+ /** How long we're over the nodeAveragePingTime threshold before we
alert (in milliseconds)*/
+ public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
+ /** Accept one request every 10 seconds regardless, to ensure we update
the
+ * block send time.
+ */
+ public static final int MAX_INTERREQUEST_TIME = 10*1000;
+
+ /** Fudge factor for high level bandwidth limiting. FIXME should be a
long term running average */
+ public static final double FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS = 0.8;
+
+ private final Node node;
+ private MemoryChecker myMemoryChecker;
+ public final PeerManager peers;
+
+ final RandomSource hardRandom;
+
+ private boolean logMINOR;
+
+ /** first time bwlimitDelay was over PeerManagerUserAlert threshold */
+ private long firstBwlimitDelayTimeThresholdBreak ;
+ /** first time nodeAveragePing was over PeerManagerUserAlert threshold
*/
+ private long firstNodeAveragePingTimeThresholdBreak;
+ /** bwlimitDelay PeerManagerUserAlert should happen if true */
+ public boolean bwlimitDelayAlertRelevant;
+ /** nodeAveragePing PeerManagerUserAlert should happen if true */
+ public boolean nodeAveragePingAlertRelevant;
+ /** Average proportion of requests rejected immediately due to overload
*/
+ public final TimeDecayingRunningAverage pInstantRejectIncoming;
+
+ /** Average delay caused by throttling for sending a packet */
+ final TimeDecayingRunningAverage throttledPacketSendAverage;
+
+ // Bytes used by each different type of local/remote chk/ssk
request/insert
+ final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
+ final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
+ final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
+ final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
+ final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
+ final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
+ final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
+
+ // Bytes used by successful chk/ssk request/insert.
+ // Note: These are used to determine whether to accept a request,
+ // hence they should be roughly representative of incoming - NOT LOCAL -
+ // requests. Therefore, while we DO report local successful requests,
+ // we only report the portion which will be consistent with a remote
+ // request. If there is both a Handler and a Sender, it's a remote
+ // request, report both. If there is only a Sender, report only the
+ // received bytes (for a request). Etc.
+
+ // Note that these are always reported in the Handler or the
NodeClientCore
+ // call taking its place.
+ final TimeDecayingRunningAverage successfulChkFetchBytesSentAverage;
+ final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
+ final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
+ final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
+ final TimeDecayingRunningAverage
successfulChkInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage
successfulSskInsertBytesReceivedAverage;
+
+ final TrivialRunningAverage globalFetchPSuccess;
+ final TrivialRunningAverage chkFetchPSuccess;
+ final TrivialRunningAverage sskFetchPSuccess;
+ final TrivialRunningAverage localFetchPSuccess;
+ final TrivialRunningAverage remoteFetchPSuccess;
+
+ File persistTarget;
+ File persistTemp;
+ private long previous_input_stat;
+ private long previous_output_stat;
+ private long previous_io_stat_time;
+ private long last_input_stat;
+ private long last_output_stat;
+ private long last_io_stat_time;
+ private final Object ioStatSync = new Object();
+ /** Next time to update the node I/O stats */
+ private long nextNodeIOStatsUpdateTime = -1;
+ /** Node I/O stats update interval (milliseconds) */
+ private static final long nodeIOStatsUpdateInterval = 2000;
+
+ /** Token bucket for output bandwidth used by requests */
+ final TokenBucket requestOutputThrottle;
+ /** Token bucket for input bandwidth used by requests */
+ final TokenBucket requestInputThrottle;
+
+ // various metrics
+ public final RunningAverage routingMissDistance;
+ public final RunningAverage backedOffPercent;
+ public final RunningAverage avgCacheLocation;
+ public final RunningAverage avgStoreLocation;
+ public final RunningAverage avgCacheSuccess;
+ public final RunningAverage avgStoreSuccess;
+ // FIXME: does furthest{Store,Cache}Success need to be synchronized?
+ public double furthestCacheSuccess=0.0;
+ public double furthestStoreSuccess=0.0;
+ protected final Persister persister;
+
+ protected final RunningAverage avgRequestLocation;
+
+ // ThreadCounting stuffs
+ public final ThreadGroup rootThreadGroup;
+ private int threadLimit;
+
+ // Free heap memory threshold stuffs
+ private long freeHeapBytesThreshold;
+ private int freeHeapPercentThreshold;
+
+ final NodePinger nodePinger;
+
+ final StringCounter preemptiveRejectReasons;
+ final StringCounter localPreemptiveRejectReasons;
+
+ // Enable this if you run into hard to debug OOMs.
+ // Disabled to prevent long pauses every 30 seconds.
+ private int aggressiveGCModificator = -1 /*250*/;
+
+ // Peers stats
+ /** Next time to update PeerManagerUserAlert stats */
+ private long nextPeerManagerUserAlertStatsUpdateTime = -1;
+ /** PeerManagerUserAlert stats update interval (milliseconds) */
+ private static final long peerManagerUserAlertStatsUpdateInterval =
1000; // 1 second
+
+ NodeStats(Node node, int sortOrder, SubConfig statsConfig,
SimpleFieldSet oldThrottleFS, int obwLimit, int ibwLimit) throws
NodeInitException {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ this.node = node;
+ this.peers = node.peers;
+ this.hardRandom = node.random;
+ this.routingMissDistance = new TimeDecayingRunningAverage(0.0,
180000, 0.0, 1.0, node);
+ this.backedOffPercent = new TimeDecayingRunningAverage(0.0,
180000, 0.0, 1.0, node);
+ double nodeLoc=node.lm.getLocation();
+ // FIXME PLEASE; (int) casts; (maxCacheKeys>MAXINT?) This value
will probably end up being a small constant anyway (200?).
+ this.avgCacheLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, (int)node.maxCacheKeys,
null);
+ this.avgStoreLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, (int)node.maxStoreKeys,
null);
+ this.avgCacheSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
+ this.avgStoreSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
+ this.avgRequestLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
+ preemptiveRejectReasons = new StringCounter();
+ localPreemptiveRejectReasons = new StringCounter();
+ pInstantRejectIncoming = new TimeDecayingRunningAverage(0,
60000, 0.0, 1.0, node);
+ ThreadGroup tg = Thread.currentThread().getThreadGroup();
+ while(tg.getParent() != null) tg = tg.getParent();
+ this.rootThreadGroup = tg;
+ throttledPacketSendAverage =
+ new TimeDecayingRunningAverage(1, 10*60*1000 /* should
be significantly longer than a typical transfer */, 0, Long.MAX_VALUE, node);
+ nodePinger = new NodePinger(node);
+
+ previous_input_stat = 0;
+ previous_output_stat = 0;
+ previous_io_stat_time = 1;
+ last_input_stat = 0;
+ last_output_stat = 0;
+ last_io_stat_time = 3;
+
+ statsConfig.register("threadLimit", 500, sortOrder++, true,
true, "NodeStat.threadLimit", "NodeStat.threadLimitLong",
+ new IntCallback() {
+ public int get() {
+ return threadLimit;
+ }
+ public void set(int val) throws
InvalidConfigValueException {
+ if(val == get()) return;
+ if(val < 100)
+ throw new
InvalidConfigValueException(l10n("valueTooLow"));
+ threadLimit = val;
+ }
+ });
+ threadLimit = statsConfig.getInt("threadLimit");
+
+ // Yes it could be in seconds insteed of multiples of 0.12, but
we don't want people to play with it :)
+ statsConfig.register("aggressiveGC", aggressiveGCModificator,
sortOrder++, true, false, "NodeStat.aggressiveGC", "NodeStat.aggressiveGCLong",
+ new IntCallback() {
+ public int get() {
+ return aggressiveGCModificator;
+ }
+ public void set(int val) throws
InvalidConfigValueException {
+ if(val == get()) return;
+ Logger.normal(this, "Changing
aggressiveGCModificator to "+val);
+ aggressiveGCModificator = val;
+ }
+ });
+ aggressiveGCModificator = statsConfig.getInt("aggressiveGC");
+
+ myMemoryChecker = new MemoryChecker(node.ps,
aggressiveGCModificator);
+ statsConfig.register("memoryChecker", true, sortOrder++, true,
false, "NodeStat.memCheck", "NodeStat.memCheckLong",
+ new BooleanCallback(){
+ public boolean get() {
+ return
myMemoryChecker.isRunning();
+ }
+
+ public void set(boolean val) throws
InvalidConfigValueException {
+ if(val == get()) return;
+
+ if(val)
+ myMemoryChecker.start();
+ else
+
myMemoryChecker.terminate();
+ }
+ });
+ if(statsConfig.getBoolean("memoryChecker"))
+ myMemoryChecker.start();
+
+ statsConfig.register("freeHeapBytesThreshold", "5M",
sortOrder++, true, true, "NodeStat.freeHeapBytesThreshold",
"NodeStat.freeHeapBytesThresholdLong",
+ new LongCallback() {
+ public long get() {
+ return freeHeapBytesThreshold;
+ }
+ public void set(long val) throws
InvalidConfigValueException {
+ if(val == get()) return;
+ if(val < 0)
+ throw new
InvalidConfigValueException(l10n("valueTooLow"));
+ freeHeapBytesThreshold = val;
+ }
+ });
+ freeHeapBytesThreshold =
statsConfig.getLong("freeHeapBytesThreshold");
+
+ statsConfig.register("freeHeapPercentThreshold", "5",
sortOrder++, true, true, "NodeStat.freeHeapPercentThreshold",
"NodeStat.freeHeapPercentThresholdLong",
+ new IntCallback() {
+ public int get() {
+ return freeHeapPercentThreshold;
+ }
+ public void set(int val) throws
InvalidConfigValueException {
+ if(val == get()) return;
+ if(val < 0 || val >= 100)
+ throw new
InvalidConfigValueException(l10n("mustBePercentValueNotFull"));
+ freeHeapPercentThreshold = val;
+ }
+ });
+ freeHeapPercentThreshold =
statsConfig.getInt("freeHeapPercentThreshold");
+
+ persister = new ConfigurablePersister(this, statsConfig,
"nodeThrottleFile", "node-throttle.dat", sortOrder++, true, false,
+ "NodeStat.statsPersister",
"NodeStat.statsPersisterLong", node.ps);
+
+ SimpleFieldSet throttleFS = persister.read();
+
+ if(throttleFS == null)
+ throttleFS = oldThrottleFS;
+
+ if(logMINOR) Logger.minor(this, "Read
throttleFS:\n"+throttleFS);
+
+ // Guesstimates. Hopefully well over the reality.
+ localChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalChkFetchBytesSentAverage"), node);
+ localSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalSskFetchBytesSentAverage"), node);
+ localChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalChkInsertBytesSentAverage"), node);
+ localSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalSskInsertBytesSentAverage"), node);
+ localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesReceivedAverage"), node);
+ localSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalSskFetchBytesReceivedAverage"), node);
+ localChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
+ localSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(500, 180000, 0.0, 1024*1024*1024, throttleFS == null
? null : throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
+
+ remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesSentAverage"), node);
+ remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskFetchBytesSentAverage"), node);
+ remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesSentAverage"), node);
+ remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesSentAverage"), node);
+ remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesReceivedAverage"), node);
+ remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("RemoteSskFetchBytesReceivedAverage"), node);
+ remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesReceivedAverage"), node);
+ remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesReceivedAverage"), node);
+
+ successfulChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesSentAverage"), node);
+ successfulSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
+ successfulChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
+ successfulSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
+ successfulChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
+ successfulSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"),
node);
+ successfulChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
+ successfulSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
+
+ globalFetchPSuccess = new TrivialRunningAverage();
+ chkFetchPSuccess = new TrivialRunningAverage();
+ sskFetchPSuccess = new TrivialRunningAverage();
+ localFetchPSuccess = new TrivialRunningAverage();
+ remoteFetchPSuccess = new TrivialRunningAverage();
+
+ requestOutputThrottle =
+ new TokenBucket(Math.max(obwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (obwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
+ requestInputThrottle =
+ new TokenBucket(Math.max(ibwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (ibwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
+
+ estimatedSizeOfOneThrottledPacket = 1024 +
DMT.packetTransmitSize(1024, 32) +
+ node.estimateFullHeadersLengthOneMessage();
+ }
+
+ protected String l10n(String key) {
+ return L10n.getString("NodeStats."+key);
+ }
+
+ public void start() throws NodeInitException {
+ nodePinger.start();
+ persister.start();
+
node.getTicker().queueTimedJob(throttledPacketSendAverageIdleUpdater,
CHECK_THROTTLE_TIME);
+ }
+
+ /** Every 60 seconds, check whether we need to adjust the bandwidth
delay time because of idleness.
+ * (If no packets have been sent, the throttledPacketSendAverage should
decrease; if it doesn't, it may go high,
+ * and then no requests will be accepted, and it will stay high
forever. */
+ static final int CHECK_THROTTLE_TIME = 60 * 1000;
+ /** Absolute limit of 4MB queued to any given peer. FIXME make this
configurable.
+ * Note that for many MessageItem's, the actual memory usage will be
significantly more than this figure. */
+ private static final long MAX_PEER_QUEUE_BYTES = 4 * 1024 * 1024;
+ /** Don't accept requests if it'll take more than 10 minutes to send
the current message queue */
+ private static final double MAX_PEER_QUEUE_TIME = 10 * 60 * 1000.0;
+
+ private long lastAcceptedRequest = -1;
+
+ final int estimatedSizeOfOneThrottledPacket;
+
+ final Runnable throttledPacketSendAverageIdleUpdater =
+ new Runnable() {
+ public void run() {
+ long now = System.currentTimeMillis();
+ try {
+
if(throttledPacketSendAverage.lastReportTime() < now - 5000) { // if last
report more than 5 seconds ago
+ // shouldn't take long
+
node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
+
node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
+ long after =
System.currentTimeMillis();
+ // Report time it takes to grab
the bytes.
+
throttledPacketSendAverage.report(after - now);
+ }
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ } finally {
+ node.getTicker().queueTimedJob(this,
CHECK_THROTTLE_TIME);
+ long end = System.currentTimeMillis();
+ if(logMINOR)
+ Logger.minor(this, "Throttle
check took "+TimeUtil.formatTime(end-now,2,true));
+ }
+ }
+ };
+
+ /* return reject reason as string if should reject, otherwise return
null */
+ public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ if(logMINOR) dumpByteCostAverages();
+
+ int threadCount = getActiveThreadCount();
+ if(threadLimit < threadCount) {
+ pInstantRejectIncoming.report(1.0);
+ rejected(">threadLimit", isLocal);
+ return ">threadLimit ("+threadCount+'/'+threadLimit+')';
+ }
+
+ double bwlimitDelayTime =
throttledPacketSendAverage.currentValue();
+
+ // If no recent reports, no packets have been sent; correct the
average downwards.
+ long now = System.currentTimeMillis();
+ double pingTime = nodePinger.averagePingTime();
+ synchronized(this) {
+ // Round trip time
+ if(pingTime > MAX_PING_TIME) {
+ if((now - lastAcceptedRequest >
MAX_INTERREQUEST_TIME) && canAcceptAnyway) {
+ if(logMINOR) Logger.minor(this,
"Accepting request anyway (take one every 10 secs to keep bwlimitDelayTime
updated)");
+ } else {
+ pInstantRejectIncoming.report(1.0);
+ rejected(">MAX_PING_TIME", isLocal);
+ return ">MAX_PING_TIME
("+TimeUtil.formatTime((long)pingTime, 2, true)+ ')';
+ }
+ } else if(pingTime > SUB_MAX_PING_TIME) {
+ double x = ((double)(pingTime -
SUB_MAX_PING_TIME)) / (MAX_PING_TIME - SUB_MAX_PING_TIME);
+ if(hardRandom.nextDouble() < x) {
+ pInstantRejectIncoming.report(1.0);
+ rejected(">SUB_MAX_PING_TIME", isLocal);
+ return ">SUB_MAX_PING_TIME
("+TimeUtil.formatTime((long)pingTime, 2, true)+ ')';
+ }
+ }
+
+ // Bandwidth limited packets
+ if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
+ if((now - lastAcceptedRequest >
MAX_INTERREQUEST_TIME) && canAcceptAnyway) {
+ if(logMINOR) Logger.minor(this,
"Accepting request anyway (take one every 10 secs to keep bwlimitDelayTime
updated)");
+ } else {
+ pInstantRejectIncoming.report(1.0);
+ rejected(">MAX_THROTTLE_DELAY",
isLocal);
+ return ">MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
+ }
+ } else if(bwlimitDelayTime > SUB_MAX_THROTTLE_DELAY) {
+ double x = ((double)(bwlimitDelayTime -
SUB_MAX_THROTTLE_DELAY)) / (MAX_THROTTLE_DELAY - SUB_MAX_THROTTLE_DELAY);
+ if(hardRandom.nextDouble() < x) {
+ pInstantRejectIncoming.report(1.0);
+ rejected(">SUB_MAX_THROTTLE_DELAY",
isLocal);
+ return ">SUB_MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
+ }
+ }
+
+ }
+
+ // Successful cluster timeout protection.
+ // Reject request if the result of all our current requests
completing simultaneously would be that
+ // some of them timeout.
+
+ // Never reject a CHK and accept an SSK. Because if we do that,
we would be constantly accepting SSKs, as there
+ // would never be enough space for a CHK. So we add 1 to each
type of request's count before computing the
+ // bandwidth liability. Thus, if we have exactly enough space
for 1 SSK and 1 CHK, we can accept either, and
+ // when one of either type completes, we can accept one of
either type again: We never let SSKs drain the
+ // "bucket" and block CHKs.
+
+ int numCHKRequests = node.getNumCHKRequests() + 1;
+ int numSSKRequests = node.getNumSSKRequests() + 1;
+ int numCHKInserts = node.getNumCHKInserts() + 1;
+ int numSSKInserts = node.getNumSSKInserts() + 1;
+ if(logMINOR)
+ Logger.minor(this, "Running (adjusted): CHK fetch
"+numCHKRequests+" SSK fetch "+numSSKRequests+" CHK insert "+numCHKInserts+"
SSK insert "+numSSKInserts);
+
+ double bandwidthLiabilityOutput =
+ successfulChkFetchBytesSentAverage.currentValue() *
numCHKRequests +
+ successfulSskFetchBytesSentAverage.currentValue() *
numSSKRequests +
+ successfulChkInsertBytesSentAverage.currentValue() *
numCHKInserts +
+ successfulSskInsertBytesSentAverage.currentValue() *
numSSKInserts;
+ double bandwidthAvailableOutput =
+ node.getOutputBandwidthLimit() * 90; // 90 seconds at
full power; we have to leave some time for the search as well
+ bandwidthAvailableOutput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
+ if(bandwidthLiabilityOutput > bandwidthAvailableOutput) {
+ pInstantRejectIncoming.report(1.0);
+ rejected("Output bandwidth liability", isLocal);
+ return "Output bandwidth liability
("+bandwidthLiabilityOutput+" > "+bandwidthAvailableOutput+")";
+ }
+
+ double bandwidthLiabilityInput =
+ successfulChkFetchBytesReceivedAverage.currentValue() *
numCHKRequests +
+ successfulSskFetchBytesReceivedAverage.currentValue() *
numSSKRequests +
+ successfulChkInsertBytesReceivedAverage.currentValue()
* numCHKInserts +
+ successfulSskInsertBytesReceivedAverage.currentValue()
* numSSKInserts;
+ double bandwidthAvailableInput =
+ node.getInputBandwidthLimit() * 90; // 90 seconds at
full power
+ bandwidthAvailableInput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
+ if(bandwidthLiabilityInput > bandwidthAvailableInput) {
+ pInstantRejectIncoming.report(1.0);
+ rejected("Input bandwidth liability", isLocal);
+ return "Input bandwidth liability
("+bandwidthLiabilityInput+" > "+bandwidthAvailableInput+")";
+ }
+
+
+ // Do we have the bandwidth?
+ double expected = this.getThrottle(isLocal, isInsert, isSSK,
true).currentValue();
+ int expectedSent = (int)Math.max(expected, 0);
+ if(logMINOR)
+ Logger.minor(this, "Expected sent bytes:
"+expectedSent);
+ if(!requestOutputThrottle.instantGrab(expectedSent)) {
+ pInstantRejectIncoming.report(1.0);
+ rejected("Insufficient output bandwidth", isLocal);
+ return "Insufficient output bandwidth";
+ }
+ expected = this.getThrottle(isLocal, isInsert, isSSK,
false).currentValue();
+ int expectedReceived = (int)Math.max(expected, 0);
+ if(logMINOR)
+ Logger.minor(this, "Expected received bytes:
"+expectedSent);
+ if(!requestInputThrottle.instantGrab(expectedReceived)) {
+ requestOutputThrottle.recycle(expectedSent);
+ pInstantRejectIncoming.report(1.0);
+ rejected("Insufficient input bandwidth", isLocal);
+ return "Insufficient input bandwidth";
+ }
+
+ Runtime r = Runtime.getRuntime();
+ long maxHeapMemory = r.maxMemory();
+
+ /* There are some JVMs (for example libgcj 4.1.1) whose
Runtime.maxMemory() does not work. */
+ if(maxHeapMemory < Long.MAX_VALUE) { // would mean unlimited
+ long totalHeapMemory = r.totalMemory();
+ long freeHeapMemory = r.freeMemory();
+
+ freeHeapMemory = maxHeapMemory - (totalHeapMemory -
freeHeapMemory);
+
+ if(freeHeapMemory < freeHeapBytesThreshold) {
+ pInstantRejectIncoming.report(1.0);
+ rejected("<freeHeapBytesThreshold", isLocal);
+ return "<freeHeapBytesThreshold
("+SizeUtil.formatSize(freeHeapMemory, false)+" of
"+SizeUtil.formatSize(maxHeapMemory, false)+')';
+ }
+ double percentFreeHeapMemoryOfMax = ((double) freeHeapMemory) /
((double) maxHeapMemory);
+ double freeHeapPercentThresholdDouble = ((double)
freeHeapPercentThreshold) / ((double) 100);
+ if(percentFreeHeapMemoryOfMax < freeHeapPercentThresholdDouble)
{
+ pInstantRejectIncoming.report(1.0);
+ DecimalFormat fix3p1pct = new DecimalFormat("##0.0%");
+ rejected("<freeHeapPercentThreshold", isLocal);
+ return "<freeHeapPercentThreshold
("+SizeUtil.formatSize(freeHeapMemory, false)+" of
"+SizeUtil.formatSize(maxHeapMemory, false)+"
("+fix3p1pct.format(percentFreeHeapMemoryOfMax)+"))";
+ }
+ }
+
+ if(source != null) {
+ long queuedBytes = source.getMessageQueueLengthBytes();
+ if(queuedBytes > MAX_PEER_QUEUE_BYTES)
+ return "Too many message bytes queued for peer";
+ if(queuedBytes /
(source.getThrottle().getBandwidth()+1.0) > MAX_PEER_QUEUE_TIME)
+ return "Peer's queue will take too long to
transfer";
+ }
+
+ synchronized(this) {
+ if(logMINOR) Logger.minor(this, "Accepting request?");
+ lastAcceptedRequest = now;
+ }
+
+ pInstantRejectIncoming.report(0.0);
+
+ // Accept
+ return null;
+ }
+
+ private void rejected(String reason, boolean isLocal) {
+ if(!isLocal) preemptiveRejectReasons.inc(reason);
+ else this.localPreemptiveRejectReasons.inc(reason);
+ }
+
+ private RunningAverage getThrottle(boolean isLocal, boolean isInsert,
boolean isSSK, boolean isSent) {
+ if(isLocal) {
+ if(isInsert) {
+ if(isSSK) {
+ return isSent ?
this.localSskInsertBytesSentAverage : this.localSskInsertBytesReceivedAverage;
+ } else {
+ return isSent ?
this.localChkInsertBytesSentAverage : this.localChkInsertBytesReceivedAverage;
+ }
+ } else {
+ if(isSSK) {
+ return isSent ?
this.localSskFetchBytesSentAverage : this.localSskFetchBytesReceivedAverage;
+ } else {
+ return isSent ?
this.localChkFetchBytesSentAverage : this.localChkFetchBytesReceivedAverage;
+ }
+ }
+ } else {
+ if(isInsert) {
+ if(isSSK) {
+ return isSent ?
this.remoteSskInsertBytesSentAverage : this.remoteSskInsertBytesReceivedAverage;
+ } else {
+ return isSent ?
this.remoteChkInsertBytesSentAverage : this.remoteChkInsertBytesReceivedAverage;
+ }
+ } else {
+ if(isSSK) {
+ return isSent ?
this.remoteSskFetchBytesSentAverage : this.remoteSskFetchBytesReceivedAverage;
+ } else {
+ return isSent ?
this.remoteChkFetchBytesSentAverage : this.remoteChkFetchBytesReceivedAverage;
+ }
+ }
+ }
+ }
+
+ private void dumpByteCostAverages() {
+ Logger.minor(this, "Byte cost averages: REMOTE:"+
+ " CHK insert
"+remoteChkInsertBytesSentAverage.currentValue()+ '/'
+remoteChkInsertBytesReceivedAverage.currentValue()+
+ " SSK insert
"+remoteSskInsertBytesSentAverage.currentValue()+ '/'
+remoteSskInsertBytesReceivedAverage.currentValue()+
+ " CHK fetch
"+remoteChkFetchBytesSentAverage.currentValue()+ '/'
+remoteChkFetchBytesReceivedAverage.currentValue()+
+ " SSK fetch
"+remoteSskFetchBytesSentAverage.currentValue()+ '/'
+remoteSskFetchBytesReceivedAverage.currentValue());
+ Logger.minor(this, "Byte cost averages: LOCAL:"+
+ " CHK insert
"+localChkInsertBytesSentAverage.currentValue()+ '/'
+localChkInsertBytesReceivedAverage.currentValue()+
+ " SSK insert
"+localSskInsertBytesSentAverage.currentValue()+ '/'
+localSskInsertBytesReceivedAverage.currentValue()+
+ " CHK fetch
"+localChkFetchBytesSentAverage.currentValue()+ '/'
+localChkFetchBytesReceivedAverage.currentValue()+
+ " SSK fetch
"+localSskFetchBytesSentAverage.currentValue()+ '/'
+localSskFetchBytesReceivedAverage.currentValue());
+ Logger.minor(this, "Byte cost averages: SUCCESSFUL:"+
+ " CHK insert
"+successfulChkInsertBytesSentAverage.currentValue()+ '/'
+successfulChkInsertBytesReceivedAverage.currentValue()+
+ " SSK insert
"+successfulSskInsertBytesSentAverage.currentValue()+ '/'
+successfulSskInsertBytesReceivedAverage.currentValue()+
+ " CHK fetch
"+successfulChkFetchBytesSentAverage.currentValue()+ '/'
+successfulChkFetchBytesReceivedAverage.currentValue()+
+ " SSK fetch
"+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue());
+
+ }
+
+ public double getBwlimitDelayTime() {
+ return throttledPacketSendAverage.currentValue();
+ }
+
+ public double getNodeAveragePingTime() {
+ return nodePinger.averagePingTime();
+ }
+
+ public int getNetworkSizeEstimate(long timestamp) {
+ return node.lm.getNetworkSizeEstimate( timestamp );
+ }
+
+ public Object[] getKnownLocations(long timestamp) {
+ return node.lm.getKnownLocations( timestamp );
+ }
+
+ public double pRejectIncomingInstantly() {
+ return pInstantRejectIncoming.currentValue();
+ }
+
+ /**
+ * Update peerManagerUserAlertStats if the timer has expired.
+ * Only called from PacketSender so doesn't need sync.
+ */
+ public void maybeUpdatePeerManagerUserAlertStats(long now) {
+ if(now > nextPeerManagerUserAlertStatsUpdateTime) {
+ if(getBwlimitDelayTime() >
MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD) {
+ if(firstBwlimitDelayTimeThresholdBreak == 0) {
+ firstBwlimitDelayTimeThresholdBreak =
now;
+ }
+ } else {
+ firstBwlimitDelayTimeThresholdBreak = 0;
+ }
+ if((firstBwlimitDelayTimeThresholdBreak != 0) && ((now
- firstBwlimitDelayTimeThresholdBreak) >= MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY)) {
+ bwlimitDelayAlertRelevant = true;
+ } else {
+ bwlimitDelayAlertRelevant = false;
+ }
+ if(getNodeAveragePingTime() >
MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD) {
+ if(firstNodeAveragePingTimeThresholdBreak == 0)
{
+ firstNodeAveragePingTimeThresholdBreak
= now;
+ }
+ } else {
+ firstNodeAveragePingTimeThresholdBreak = 0;
+ }
+ if((firstNodeAveragePingTimeThresholdBreak != 0) &&
((now - firstNodeAveragePingTimeThresholdBreak) >=
MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY)) {
+ nodeAveragePingAlertRelevant = true;
+ } else {
+ nodeAveragePingAlertRelevant = false;
+ }
+ if(logMINOR && Logger.shouldLog(Logger.DEBUG, this))
Logger.debug(this, "mUPMUAS: "+now+": "+getBwlimitDelayTime()+" >?
"+MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD+" since
"+firstBwlimitDelayTimeThresholdBreak+" ("+bwlimitDelayAlertRelevant+")
"+getNodeAveragePingTime()+" >? "+MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD+"
since "+firstNodeAveragePingTimeThresholdBreak+"
("+nodeAveragePingAlertRelevant+ ')');
+ nextPeerManagerUserAlertStatsUpdateTime = now +
peerManagerUserAlertStatsUpdateInterval;
+ }
+ }
+
+ public SimpleFieldSet persistThrottlesToFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(true);
+ fs.put("RemoteChkFetchBytesSentAverage",
remoteChkFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("RemoteSskFetchBytesSentAverage",
remoteSskFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("RemoteChkInsertBytesSentAverage",
remoteChkInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("RemoteSskInsertBytesSentAverage",
remoteSskInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("RemoteChkFetchBytesReceivedAverage",
remoteChkFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("RemoteSskFetchBytesReceivedAverage",
remoteSskFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("RemoteChkInsertBytesReceivedAverage",
remoteChkInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("RemoteSskInsertBytesReceivedAverage",
remoteSskInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("LocalChkFetchBytesSentAverage",
localChkFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("LocalSskFetchBytesSentAverage",
localSskFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("LocalChkInsertBytesSentAverage",
localChkInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("LocalSskInsertBytesSentAverage",
localSskInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("LocalChkFetchBytesReceivedAverage",
localChkFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("LocalSskFetchBytesReceivedAverage",
localSskFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("LocalChkInsertBytesReceivedAverage",
localChkInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("LocalSskInsertBytesReceivedAverage",
localSskInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkFetchBytesSentAverage",
successfulChkFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskFetchBytesSentAverage",
successfulSskFetchBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkInsertBytesSentAverage",
successfulChkInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskInsertBytesSentAverage",
successfulSskInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkFetchBytesReceivedAverage",
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskFetchBytesReceivedAverage",
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkInsertBytesReceivedAverage",
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskInsertBytesReceivedAverage",
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
+ return fs;
+ }
+
+ /**
+ * Update the node-wide bandwidth I/O stats if the timer has expired
+ */
+ public void maybeUpdateNodeIOStats(long now) {
+ if(now > nextNodeIOStatsUpdateTime) {
+ long[] io_stats = IOStatisticCollector.getTotalIO();
+ long outdiff;
+ long indiff;
+ synchronized(ioStatSync) {
+ previous_output_stat = last_output_stat;
+ previous_input_stat = last_input_stat;
+ previous_io_stat_time = last_io_stat_time;
+ last_output_stat = io_stats[ 0 ];
+ last_input_stat = io_stats[ 1 ];
+ last_io_stat_time = now;
+ outdiff = last_output_stat -
previous_output_stat;
+ indiff = last_input_stat - previous_input_stat;
+ }
+ if(logMINOR)
+ Logger.minor(this, "Last 2 seconds: input:
"+indiff+" output: "+outdiff);
+ nextNodeIOStatsUpdateTime = now +
nodeIOStatsUpdateInterval;
+ }
+ }
+
+ public long[] getNodeIOStats() {
+ long[] result = new long[6];
+ synchronized(ioStatSync) {
+ result[ 0 ] = previous_output_stat;
+ result[ 1 ] = previous_input_stat;
+ result[ 2 ] = previous_io_stat_time;
+ result[ 3 ] = last_output_stat;
+ result[ 4 ] = last_input_stat;
+ result[ 5 ] = last_io_stat_time;
+ }
+ return result;
+ }
+
+ public void waitUntilNotOverloaded(boolean isInsert) {
+ while(threadLimit < getActiveThreadCount()){
+ try{
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ public int getActiveThreadCount() {
+ return rootThreadGroup.activeCount() -
node.executor.waitingThreads();
+ }
+
+ public int getThreadLimit() {
+ return threadLimit;
+ }
+
+ public SimpleFieldSet exportVolatileFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(true);
+ long now = System.currentTimeMillis();
+ fs.put("isUsingWrapper", node.isUsingWrapper());
+ long nodeUptimeSeconds = 0;
+ synchronized(this) {
+ fs.put("startupTime", node.startupTime);
+ nodeUptimeSeconds = (now - node.startupTime) / 1000;
+ fs.put("uptimeSeconds", nodeUptimeSeconds);
+ }
+ fs.put("averagePingTime", getNodeAveragePingTime());
+ fs.put("bwlimitDelayTime", getBwlimitDelayTime());
+ fs.put("networkSizeEstimateSession",
getNetworkSizeEstimate(-1));
+ int networkSizeEstimate24hourRecent =
getNetworkSizeEstimate(now - (24*60*60*1000)); // 24 hours
+ fs.put("networkSizeEstimate24hourRecent",
networkSizeEstimate24hourRecent);
+ int networkSizeEstimate48hourRecent =
getNetworkSizeEstimate(now - (48*60*60*1000)); // 48 hours
+ fs.put("networkSizeEstimate48hourRecent",
networkSizeEstimate48hourRecent);
+ fs.put("routingMissDistance",
routingMissDistance.currentValue());
+ fs.put("backedOffPercent", backedOffPercent.currentValue());
+ fs.put("pInstantReject", pRejectIncomingInstantly());
+ fs.put("unclaimedFIFOSize", node.usm.getUnclaimedFIFOSize());
+
+ /* gather connection statistics */
+ DarknetPeerNodeStatus[] peerNodeStatuses =
peers.getDarknetPeerNodeStatuses();
+ Arrays.sort(peerNodeStatuses, new Comparator() {
+ public int compare(Object first, Object second) {
+ DarknetPeerNodeStatus firstNode =
(DarknetPeerNodeStatus) first;
+ DarknetPeerNodeStatus secondNode =
(DarknetPeerNodeStatus) second;
+ int statusDifference =
firstNode.getStatusValue() - secondNode.getStatusValue();
+ if (statusDifference != 0) {
+ return statusDifference;
+ }
+ return
firstNode.getName().compareToIgnoreCase(secondNode.getName());
+ }
+ });
+
+ int numberOfConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_CONNECTED);
+ int numberOfRoutingBackedOff =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF);
+ int numberOfTooNew =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_NEW);
+ int numberOfTooOld =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_OLD);
+ int numberOfDisconnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISCONNECTED);
+ int numberOfNeverConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_NEVER_CONNECTED);
+ int numberOfDisabled =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISABLED);
+ int numberOfBursting =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_BURSTING);
+ int numberOfListening =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTENING);
+ int numberOfListenOnly =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTEN_ONLY);
+
+ int numberOfSimpleConnected = numberOfConnected +
numberOfRoutingBackedOff;
+ int numberOfNotConnected = numberOfTooNew + numberOfTooOld +
numberOfDisconnected + numberOfNeverConnected + numberOfDisabled +
numberOfBursting + numberOfListening + numberOfListenOnly;
+
+ fs.put("numberOfConnected", numberOfConnected);
+ fs.put("numberOfRoutingBackedOff", numberOfRoutingBackedOff);
+ fs.put("numberOfTooNew", numberOfTooNew);
+ fs.put("numberOfTooOld", numberOfTooOld);
+ fs.put("numberOfDisconnected", numberOfDisconnected);
+ fs.put("numberOfNeverConnected", numberOfNeverConnected);
+ fs.put("numberOfDisabled", numberOfDisabled);
+ fs.put("numberOfBursting", numberOfBursting);
+ fs.put("numberOfListening", numberOfListening);
+ fs.put("numberOfListenOnly", numberOfListenOnly);
+
+ fs.put("numberOfSimpleConnected", numberOfSimpleConnected);
+ fs.put("numberOfNotConnected", numberOfNotConnected);
+
+ fs.put("numberOfInsertSenders", node.getNumInsertSenders());
+ fs.put("numberOfRequestSenders", node.getNumRequestSenders());
+ fs.put("numberOfTransferringRequestSenders",
node.getNumTransferringRequestSenders());
+ fs.put("numberOfARKFetchers", node.getNumARKFetchers());
+
+ long[] total = IOStatisticCollector.getTotalIO();
+ long total_output_rate = (total[0]) / nodeUptimeSeconds;
+ long total_input_rate = (total[1]) / nodeUptimeSeconds;
+ long totalPayloadOutput = node.getTotalPayloadSent();
+ long total_payload_output_rate = totalPayloadOutput /
nodeUptimeSeconds;
+ int total_payload_output_percent = (int) (100 *
totalPayloadOutput / total[0]);
+ fs.put("totalOutputBytes", total[0]);
+ fs.put("totalOutputRate", total_output_rate);
+ fs.put("totalPayloadOutputBytes", totalPayloadOutput);
+ fs.put("totalPayloadOutputRate", total_payload_output_rate);
+ fs.put("totalPayloadOutputPercent",
total_payload_output_percent);
+ fs.put("totalInputBytes", total[1]);
+ fs.put("totalInputRate", total_input_rate);
+
+ long[] rate = getNodeIOStats();
+ long deltaMS = (rate[5] - rate[2]);
+ double recent_output_rate = 1000.0 * (rate[3] - rate[0]) /
deltaMS;
+ double recent_input_rate = 1000.0 * (rate[4] - rate[1]) /
deltaMS;
+ fs.put("recentOutputRate", recent_output_rate);
+ fs.put("recentInputRate", recent_input_rate);
+
+ String [] routingBackoffReasons =
peers.getPeerNodeRoutingBackoffReasons();
+ if(routingBackoffReasons.length != 0) {
+ for(int i=0;i<routingBackoffReasons.length;i++) {
+ fs.put("numberWithRoutingBackoffReasons." +
routingBackoffReasons[i],
peers.getPeerNodeRoutingBackoffReasonSize(routingBackoffReasons[i]));
+ }
+ }
+
+ double swaps = (double)node.getSwaps();
+ double noSwaps = (double)node.getNoSwaps();
+ double numberOfRemotePeerLocationsSeenInSwaps =
(double)node.getNumberOfRemotePeerLocationsSeenInSwaps();
+ fs.putSingle("numberOfRemotePeerLocationsSeenInSwaps",
Double.toString(numberOfRemotePeerLocationsSeenInSwaps));
+ double avgConnectedPeersPerNode = 0.0;
+ if ((numberOfRemotePeerLocationsSeenInSwaps > 0.0) && ((swaps >
0.0) || (noSwaps > 0.0))) {
+ avgConnectedPeersPerNode =
numberOfRemotePeerLocationsSeenInSwaps/(swaps+noSwaps);
+ }
+ fs.putSingle("avgConnectedPeersPerNode",
Double.toString(avgConnectedPeersPerNode));
+
+ int startedSwaps = node.getStartedSwaps();
+ int swapsRejectedAlreadyLocked =
node.getSwapsRejectedAlreadyLocked();
+ int swapsRejectedNowhereToGo =
node.getSwapsRejectedNowhereToGo();
+ int swapsRejectedRateLimit = node.getSwapsRejectedRateLimit();
+ int swapsRejectedLoop = node.getSwapsRejectedLoop();
+ int swapsRejectedRecognizedID =
node.getSwapsRejectedRecognizedID();
+ double locationChangePerSession =
node.getLocationChangeSession();
+ double locationChangePerSwap = 0.0;
+ double locationChangePerMinute = 0.0;
+ double swapsPerMinute = 0.0;
+ double noSwapsPerMinute = 0.0;
+ double swapsPerNoSwaps = 0.0;
+ if (swaps > 0) {
+ locationChangePerSwap = locationChangePerSession/swaps;
+ }
+ if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
+ locationChangePerMinute =
locationChangePerSession/(double)(nodeUptimeSeconds/60.0);
+ }
+ if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
+ swapsPerMinute = swaps/(double)(nodeUptimeSeconds/60.0);
+ }
+ if ((noSwaps > 0.0) && (nodeUptimeSeconds >= 60)) {
+ noSwapsPerMinute =
noSwaps/(double)(nodeUptimeSeconds/60.0);
+ }
+ if ((swaps > 0.0) && (noSwaps > 0.0)) {
+ swapsPerNoSwaps = swaps/noSwaps;
+ }
+ fs.put("locationChangePerSession", locationChangePerSession);
+ fs.put("locationChangePerSwap", locationChangePerSwap);
+ fs.put("locationChangePerMinute", locationChangePerMinute);
+ fs.put("swapsPerMinute", swapsPerMinute);
+ fs.put("noSwapsPerMinute", noSwapsPerMinute);
+ fs.put("swapsPerNoSwaps", swapsPerNoSwaps);
+ fs.put("swaps", swaps);
+ fs.put("noSwaps", noSwaps);
+ fs.put("startedSwaps", startedSwaps);
+ fs.put("swapsRejectedAlreadyLocked",
swapsRejectedAlreadyLocked);
+ fs.put("swapsRejectedNowhereToGo", swapsRejectedNowhereToGo);
+ fs.put("swapsRejectedRateLimit", swapsRejectedRateLimit);
+ fs.put("swapsRejectedLoop", swapsRejectedLoop);
+ fs.put("swapsRejectedRecognizedID", swapsRejectedRecognizedID);
+ long fix32kb = 32 * 1024;
+ long cachedKeys = node.getChkDatacache().keyCount();
+ long cachedSize = cachedKeys * fix32kb;
+ long storeKeys = node.getChkDatastore().keyCount();
+ long storeSize = storeKeys * fix32kb;
+ long overallKeys = cachedKeys + storeKeys;
+ long overallSize = cachedSize + storeSize;
+
+ long maxOverallKeys = node.getMaxTotalKeys();
+ long maxOverallSize = maxOverallKeys * fix32kb;
+
+ double percentOverallKeysOfMax =
(double)(overallKeys*100)/(double)maxOverallKeys;
+
+ long cachedStoreHits = node.getChkDatacache().hits();
+ long cachedStoreMisses = node.getChkDatacache().misses();
+ long cacheAccesses = cachedStoreHits + cachedStoreMisses;
+ double percentCachedStoreHitsOfAccesses =
(double)(cachedStoreHits*100) / (double)cacheAccesses;
+ long storeHits = node.getChkDatastore().hits();
+ long storeMisses = node.getChkDatastore().misses();
+ long storeAccesses = storeHits + storeMisses;
+ double percentStoreHitsOfAccesses = (double)(storeHits*100) /
(double)storeAccesses;
+ long overallAccesses = storeAccesses + cacheAccesses;
+ double avgStoreAccessRate =
(double)overallAccesses/(double)nodeUptimeSeconds;
+
+ fs.put("cachedKeys", cachedKeys);
+ fs.put("cachedSize", cachedSize);
+ fs.put("storeKeys", storeKeys);
+ fs.put("storeSize", storeSize);
+ fs.put("overallKeys", overallKeys);
+ fs.put("overallSize", overallSize);
+ fs.put("maxOverallKeys", maxOverallKeys);
+ fs.put("maxOverallSize", maxOverallSize);
+ fs.put("percentOverallKeysOfMax", percentOverallKeysOfMax);
+ fs.put("cachedStoreHits", cachedStoreHits);
+ fs.put("cachedStoreMisses", cachedStoreMisses);
+ fs.put("cacheAccesses", cacheAccesses);
+ fs.put("percentCachedStoreHitsOfAccesses",
percentCachedStoreHitsOfAccesses);
+ fs.put("storeHits", storeHits);
+ fs.put("storeMisses", storeMisses);
+ fs.put("storeAccesses", storeAccesses);
+ fs.put("percentStoreHitsOfAccesses",
percentStoreHitsOfAccesses);
+ fs.put("overallAccesses", overallAccesses);
+ fs.put("avgStoreAccessRate", avgStoreAccessRate);
+
+ Runtime rt = Runtime.getRuntime();
+ float freeMemory = (float) rt.freeMemory();
+ float totalMemory = (float) rt.totalMemory();
+ float maxMemory = (float) rt.maxMemory();
+
+ long usedJavaMem = (long)(totalMemory - freeMemory);
+ long allocatedJavaMem = (long)totalMemory;
+ long maxJavaMem = (long)maxMemory;
+ int availableCpus = rt.availableProcessors();
+
+ fs.put("freeJavaMemory", (long)freeMemory);
+ fs.put("usedJavaMemory", usedJavaMem);
+ fs.put("allocatedJavaMemory", allocatedJavaMem);
+ fs.put("maximumJavaMemory", maxJavaMem);
+ fs.put("availableCPUs", availableCpus);
+ fs.put("runningThreadCount", getActiveThreadCount());
+
+ return fs;
+ }
+
+ public void setOutputLimit(int obwLimit) {
+
requestOutputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(obwLimit*60,
32768*20));
+ if(node.inputLimitDefault) {
+ setInputLimit(obwLimit * 4);
+ }
+ }
+
+ public void setInputLimit(int ibwLimit) {
+
requestInputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(ibwLimit*60,
32768*20));
+ }
+
+ public boolean isTestnetEnabled() {
+ return node.isTestnetEnabled();
+ }
+
+ public boolean getRejectReasonsTable(HTMLNode table) {
+ return preemptiveRejectReasons.toTableRows(table) > 0;
+ }
+
+ public boolean getLocalRejectReasonsTable(HTMLNode table) {
+ return localPreemptiveRejectReasons.toTableRows(table) > 0;
+ }
+
+ public void requestCompleted(boolean succeeded, boolean isRemote,
boolean isSSK) {
+ globalFetchPSuccess.report(succeeded ? 1.0 : 0.0);
+ if(isSSK)
+ sskFetchPSuccess.report(succeeded ? 1.0 : 0.0);
+ else
+ chkFetchPSuccess.report(succeeded ? 1.0 : 0.0);
+ if(isRemote)
+ remoteFetchPSuccess.report(succeeded ? 1.0 : 0.0);
+ else
+ localFetchPSuccess.report(succeeded ? 1.0 : 0.0);
+ }
+
+ private final DecimalFormat fix3p3pct = new DecimalFormat("##0.000%");
+ private final NumberFormat thousendPoint = NumberFormat.getInstance();
+
+ public void fillSuccessRateBox(HTMLNode parent) {
+ HTMLNode list = parent.addChild("table", "border", "0");
+ final TrivialRunningAverage[] averages = new
TrivialRunningAverage[] {
+ globalFetchPSuccess,
+ chkFetchPSuccess,
+ sskFetchPSuccess,
+ localFetchPSuccess,
+ remoteFetchPSuccess
+ };
+ final String[] names = new String[] {
+ // FIXME l10n, but atm this only shows up in
advanced mode
+ "All requests",
+ "CHKs",
+ "SSKs",
+ "Local requests",
+ "Remote requests"
+ };
+ HTMLNode row = list.addChild("tr");
+ row.addChild("th", "Group");
+ row.addChild("th", "P(success)");
+ row.addChild("th", "Count");
+
+ for(int i=0;i<averages.length;i++) {
+ row = list.addChild("tr");
+ row.addChild("td", names[i]);
+ if (averages[i].countReports()==0) {
+ row.addChild("td", "-");
+ row.addChild("td", "0");
+ } else {
+ row.addChild("td",
fix3p3pct.format(averages[i].currentValue()));
+ row.addChild("td",
thousendPoint.format(averages[i].countReports()));
+ }
+ }
+ }
+
+}