What happened here? Did you (xor) delete the file and then re-add it? Please
don't do that, it makes it extremely difficult to read the diffs.
On Sunday 23 December 2007 10:54, xor at freenetproject.org wrote:
> Author: xor
> Date: 2007-12-23 10:54:03 +0000 (Sun, 23 Dec 2007)
> New Revision: 16802
>
> Modified:
> trunk/freenet/src/freenet/node/NodeStats.java
> Log:
> - test my SVN access :)
> - workaround for bug 0001554: libgcj reports infinite maximum heap memory,
this broke the free memory calculation in shouldRejectRequest()
>
> Modified: trunk/freenet/src/freenet/node/NodeStats.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/NodeStats.java 2007-12-23 00:28:47 UTC
(rev 16801)
> +++ trunk/freenet/src/freenet/node/NodeStats.java 2007-12-23 10:54:03 UTC
(rev 16802)
> @@ -1,1031 +1,1035 @@
> -package freenet.node;
> -
> -import java.io.File;
> -import java.text.DecimalFormat;
> -import java.text.NumberFormat;
> -import java.util.Arrays;
> -import java.util.Comparator;
> -
> -import freenet.config.InvalidConfigValueException;
> -import freenet.config.SubConfig;
> -import freenet.crypt.RandomSource;
> -import freenet.io.comm.DMT;
> -import freenet.io.comm.IOStatisticCollector;
> -import freenet.l10n.L10n;
> -import freenet.support.HTMLNode;
> -import freenet.support.Logger;
> -import freenet.support.SimpleFieldSet;
> -import freenet.support.SizeUtil;
> -import freenet.support.StringCounter;
> -import freenet.support.TimeUtil;
> -import freenet.support.TokenBucket;
> -import freenet.support.api.BooleanCallback;
> -import freenet.support.api.IntCallback;
> -import freenet.support.api.LongCallback;
> -import freenet.support.math.BootstrappingDecayingRunningAverage;
> -import freenet.support.math.RunningAverage;
> -import freenet.support.math.TimeDecayingRunningAverage;
> -import freenet.support.math.TrivialRunningAverage;
> -
> -/** Node (as opposed to NodeClientCore) level statistics. Includes
shouldRejectRequest(), but not limited
> - * to stuff required to implement that. */
> -public class NodeStats implements Persistable {
> -
> - /** Sub-max ping time. If ping is greater than this, we reject some
requests. */
> - public static final long SUB_MAX_PING_TIME = 700;
> - /** Maximum overall average ping time. If ping is greater than this,
> - * we reject all requests. */
> - public static final long MAX_PING_TIME = 1500;
> - /** Maximum throttled packet delay. If the throttled packet delay is
greater
> - * than this, reject all packets. */
> - public static final long MAX_THROTTLE_DELAY = 3000;
> - /** If the throttled packet delay is less than this, reject no packets;
> if
it's
> - * between the two, reject some packets. */
> - public static final long SUB_MAX_THROTTLE_DELAY = 2000;
> - /** How high can bwlimitDelayTime be before we alert (in milliseconds)*/
> - public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD =
MAX_THROTTLE_DELAY*2;
> - /** How high can nodeAveragePingTime be before we alert (in
milliseconds)*/
> - public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD =
MAX_PING_TIME*2;
> - /** How long we're over the bwlimitDelayTime threshold before we alert
> (in
milliseconds)*/
> - public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
> - /** How long we're over the nodeAveragePingTime threshold before we
> alert
(in milliseconds)*/
> - public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
> - /** Accept one request every 10 seconds regardless, to ensure we update
the
> - * block send time.
> - */
> - public static final int MAX_INTERREQUEST_TIME = 10*1000;
> -
> - /** Fudge factor for high level bandwidth limiting. FIXME should be a
> long
term running average */
> - public static final double FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS = 0.8;
> -
> - private final Node node;
> - private MemoryChecker myMemoryChecker;
> - public final PeerManager peers;
> -
> - final RandomSource hardRandom;
> -
> - private boolean logMINOR;
> -
> - /** first time bwlimitDelay was over PeerManagerUserAlert threshold */
> - private long firstBwlimitDelayTimeThresholdBreak ;
> - /** first time nodeAveragePing was over PeerManagerUserAlert threshold
> */
> - private long firstNodeAveragePingTimeThresholdBreak;
> - /** bwlimitDelay PeerManagerUserAlert should happen if true */
> - public boolean bwlimitDelayAlertRelevant;
> - /** nodeAveragePing PeerManagerUserAlert should happen if true */
> - public boolean nodeAveragePingAlertRelevant;
> - /** Average proportion of requests rejected immediately due to overload
> */
> - public final TimeDecayingRunningAverage pInstantRejectIncoming;
> -
> - /** Average delay caused by throttling for sending a packet */
> - final TimeDecayingRunningAverage throttledPacketSendAverage;
> -
> - // Bytes used by each different type of local/remote chk/ssk
request/insert
> - final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
> - final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
> - final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
> - final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
> - final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
> - final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
> - final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
> - final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
> - final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
> - final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
> - final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
> - final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
> -
> - // Bytes used by successful chk/ssk request/insert.
> - // Note: These are used to determine whether to accept a request,
> - // hence they should be roughly representative of incoming - NOT LOCAL -
> - // requests. Therefore, while we DO report local successful requests,
> - // we only report the portion which will be consistent with a remote
> - // request. If there is both a Handler and a Sender, it's a remote
> - // request, report both. If there is only a Sender, report only the
> - // received bytes (for a request). Etc.
> -
> - // Note that these are always reported in the Handler or the
NodeClientCore
> - // call taking its place.
> - final TimeDecayingRunningAverage successfulChkFetchBytesSentAverage;
> - final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
> - final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
> - final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
> - final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
> - final TimeDecayingRunningAverage
> successfulChkInsertBytesReceivedAverage;
> - final TimeDecayingRunningAverage
> successfulSskInsertBytesReceivedAverage;
> -
> - final TrivialRunningAverage globalFetchPSuccess;
> - final TrivialRunningAverage chkFetchPSuccess;
> - final TrivialRunningAverage sskFetchPSuccess;
> - final TrivialRunningAverage localFetchPSuccess;
> - final TrivialRunningAverage remoteFetchPSuccess;
> -
> - File persistTarget;
> - File persistTemp;
> - private long previous_input_stat;
> - private long previous_output_stat;
> - private long previous_io_stat_time;
> - private long last_input_stat;
> - private long last_output_stat;
> - private long last_io_stat_time;
> - private final Object ioStatSync = new Object();
> - /** Next time to update the node I/O stats */
> - private long nextNodeIOStatsUpdateTime = -1;
> - /** Node I/O stats update interval (milliseconds) */
> - private static final long nodeIOStatsUpdateInterval = 2000;
> -
> - /** Token bucket for output bandwidth used by requests */
> - final TokenBucket requestOutputThrottle;
> - /** Token bucket for input bandwidth used by requests */
> - final TokenBucket requestInputThrottle;
> -
> - // various metrics
> - public final RunningAverage routingMissDistance;
> - public final RunningAverage backedOffPercent;
> - public final RunningAverage avgCacheLocation;
> - public final RunningAverage avgStoreLocation;
> - public final RunningAverage avgCacheSuccess;
> - public final RunningAverage avgStoreSuccess;
> - // FIXME: does furthest{Store,Cache}Success need to be synchronized?
> - public double furthestCacheSuccess=0.0;
> - public double furthestStoreSuccess=0.0;
> - protected final Persister persister;
> -
> - protected final RunningAverage avgRequestLocation;
> -
> - // ThreadCounting stuffs
> - public final ThreadGroup rootThreadGroup;
> - private int threadLimit;
> -
> - // Free heap memory threshold stuffs
> - private long freeHeapBytesThreshold;
> - private int freeHeapPercentThreshold;
> -
> - final NodePinger nodePinger;
> -
> - final StringCounter preemptiveRejectReasons;
> - final StringCounter localPreemptiveRejectReasons;
> -
> - // Enable this if you run into hard to debug OOMs.
> - // Disabled to prevent long pauses every 30 seconds.
> - private int aggressiveGCModificator = -1 /*250*/;
> -
> - // Peers stats
> - /** Next time to update PeerManagerUserAlert stats */
> - private long nextPeerManagerUserAlertStatsUpdateTime = -1;
> - /** PeerManagerUserAlert stats update interval (milliseconds) */
> - private static final long peerManagerUserAlertStatsUpdateInterval =
1000; // 1 second
> -
> - NodeStats(Node node, int sortOrder, SubConfig statsConfig,
> SimpleFieldSet
oldThrottleFS, int obwLimit, int ibwLimit) throws NodeInitException {
> - logMINOR = Logger.shouldLog(Logger.MINOR, this);
> - this.node = node;
> - this.peers = node.peers;
> - this.hardRandom = node.random;
> - this.routingMissDistance = new TimeDecayingRunningAverage(0.0,
> 180000,
0.0, 1.0, node);
> - this.backedOffPercent = new TimeDecayingRunningAverage(0.0,
> 180000, 0.0,
1.0, node);
> - double nodeLoc=node.lm.getLocation();
> - // FIXME PLEASE; (int) casts; (maxCacheKeys>MAXINT?) This value
> will
probably end up being a small constant anyway (200?).
> - this.avgCacheLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0,
(int)node.maxCacheKeys, null);
> - this.avgStoreLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0,
(int)node.maxStoreKeys, null);
> - this.avgCacheSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> - this.avgStoreSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> - this.avgRequestLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> - preemptiveRejectReasons = new StringCounter();
> - localPreemptiveRejectReasons = new StringCounter();
> - pInstantRejectIncoming = new TimeDecayingRunningAverage(0,
> 60000, 0.0,
1.0, node);
> - ThreadGroup tg = Thread.currentThread().getThreadGroup();
> - while(tg.getParent() != null) tg = tg.getParent();
> - this.rootThreadGroup = tg;
> - throttledPacketSendAverage =
> - new TimeDecayingRunningAverage(1, 10*60*1000 /* should
> be significantly
longer than a typical transfer */, 0, Long.MAX_VALUE, node);
> - nodePinger = new NodePinger(node);
> -
> - previous_input_stat = 0;
> - previous_output_stat = 0;
> - previous_io_stat_time = 1;
> - last_input_stat = 0;
> - last_output_stat = 0;
> - last_io_stat_time = 3;
> -
> - statsConfig.register("threadLimit", 500, sortOrder++, true,
true, "NodeStat.threadLimit", "NodeStat.threadLimitLong",
> - new IntCallback() {
> - public int get() {
> - return threadLimit;
> - }
> - public void set(int val) throws
> InvalidConfigValueException {
> - if(val == get()) return;
> - if(val < 100)
> - throw new
> InvalidConfigValueException(l10n("valueTooLow"));
> - threadLimit = val;
> - }
> - });
> - threadLimit = statsConfig.getInt("threadLimit");
> -
> - // Yes it could be in seconds insteed of multiples of 0.12, but
> we don't
want people to play with it :)
> - statsConfig.register("aggressiveGC", aggressiveGCModificator,
sortOrder++, true,
false, "NodeStat.aggressiveGC", "NodeStat.aggressiveGCLong",
> - new IntCallback() {
> - public int get() {
> - return aggressiveGCModificator;
> - }
> - public void set(int val) throws
> InvalidConfigValueException {
> - if(val == get()) return;
> - Logger.normal(this, "Changing
> aggressiveGCModificator to "+val);
> - aggressiveGCModificator = val;
> - }
> - });
> - aggressiveGCModificator = statsConfig.getInt("aggressiveGC");
> -
> - myMemoryChecker = new MemoryChecker(node.ps,
> aggressiveGCModificator);
> - statsConfig.register("memoryChecker", true, sortOrder++, true,
false, "NodeStat.memCheck", "NodeStat.memCheckLong",
> - new BooleanCallback(){
> - public boolean get() {
> - return
> myMemoryChecker.isRunning();
> - }
> -
> - public void set(boolean val) throws
> InvalidConfigValueException {
> - if(val == get()) return;
> -
> - if(val)
> - myMemoryChecker.start();
> - else
> -
> myMemoryChecker.terminate();
> - }
> - });
> - if(statsConfig.getBoolean("memoryChecker"))
> - myMemoryChecker.start();
> -
> - statsConfig.register("freeHeapBytesThreshold", "5M",
> sortOrder++, true,
true, "NodeStat.freeHeapBytesThreshold", "NodeStat.freeHeapBytesThresholdLong",
> - new LongCallback() {
> - public long get() {
> - return freeHeapBytesThreshold;
> - }
> - public void set(long val) throws
> InvalidConfigValueException {
> - if(val == get()) return;
> - if(val < 0)
> - throw new
> InvalidConfigValueException(l10n("valueTooLow"));
> - freeHeapBytesThreshold = val;
> - }
> - });
> - freeHeapBytesThreshold =
> statsConfig.getLong("freeHeapBytesThreshold");
> -
> - statsConfig.register("freeHeapPercentThreshold", "5",
> sortOrder++, true,
true, "NodeStat.freeHeapPercentThreshold",
"NodeStat.freeHeapPercentThresholdLong",
> - new IntCallback() {
> - public int get() {
> - return freeHeapPercentThreshold;
> - }
> - public void set(int val) throws
> InvalidConfigValueException {
> - if(val == get()) return;
> - if(val < 0 || val >= 100)
> - throw new
InvalidConfigValueException(l10n("mustBePercentValueNotFull"));
> - freeHeapPercentThreshold = val;
> - }
> - });
> - freeHeapPercentThreshold =
statsConfig.getInt("freeHeapPercentThreshold");
> -
> - persister = new ConfigurablePersister(this,
statsConfig, "nodeThrottleFile", "node-throttle.dat", sortOrder++, true,
false,
> - "NodeStat.statsPersister",
> "NodeStat.statsPersisterLong", node.ps);
> -
> - SimpleFieldSet throttleFS = persister.read();
> -
> - if(throttleFS == null)
> - throttleFS = oldThrottleFS;
> -
> - if(logMINOR) Logger.minor(this, "Read
> throttleFS:\n"+throttleFS);
> -
> - // Guesstimates. Hopefully well over the reality.
> - localChkFetchBytesSentAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesSentAverage"), node);
> - localSskFetchBytesSentAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskFetchBytesSentAverage"), node);
> - localChkInsertBytesSentAverage = new
> TimeDecayingRunningAverage(32768,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesSentAverage"), node);
> - localSskInsertBytesSentAverage = new
> TimeDecayingRunningAverage(2048,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskInsertBytesSentAverage"), node);
> - localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesReceivedAverage"), node);
> - localSskFetchBytesReceivedAverage = new
> TimeDecayingRunningAverage(2048,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskFetchBytesReceivedAverage"), node);
> - localChkInsertBytesReceivedAverage = new
> TimeDecayingRunningAverage(1024,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
> - localSskInsertBytesReceivedAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
> -
> - remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesSentAverage"), node);
> - remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskFetchBytesSentAverage"), node);
> - remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesSentAverage"), node);
> - remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesSentAverage"), node);
> - remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesReceivedAverage"), node);
> - remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null : throttleFS.subset("RemoteSskFetchBytesReceivedAverage"),
node);
> - remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesReceivedAverage"), node);
> - remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesReceivedAverage"), node);
> -
> - successfulChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesSentAverage"), node);
> - successfulSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
> - successfulChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
> - successfulSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
> - successfulChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
> - successfulSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"),
node);
> - successfulChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
> - successfulSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
> -
> - globalFetchPSuccess = new TrivialRunningAverage();
> - chkFetchPSuccess = new TrivialRunningAverage();
> - sskFetchPSuccess = new TrivialRunningAverage();
> - localFetchPSuccess = new TrivialRunningAverage();
> - remoteFetchPSuccess = new TrivialRunningAverage();
> -
> - requestOutputThrottle =
> - new TokenBucket(Math.max(obwLimit*60, 32768*20), (int)
((1000L*1000L*1000L) / (obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
0);
> - requestInputThrottle =
> - new TokenBucket(Math.max(ibwLimit*60, 32768*20), (int)
((1000L*1000L*1000L) / (ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
0);
> -
> - estimatedSizeOfOneThrottledPacket = 1024 +
> DMT.packetTransmitSize(1024,
32) +
> - node.estimateFullHeadersLengthOneMessage();
> - }
> -
> - protected String l10n(String key) {
> - return L10n.getString("NodeStats."+key);
> - }
> -
> - public void start() throws NodeInitException {
> - nodePinger.start();
> - persister.start();
> -
> node.getTicker().queueTimedJob(throttledPacketSendAverageIdleUpdater,
CHECK_THROTTLE_TIME);
> - }
> -
> - /** Every 60 seconds, check whether we need to adjust the bandwidth
> delay
time because of idleness.
> - * (If no packets have been sent, the throttledPacketSendAverage should
decrease; if it doesn't, it may go high,
> - * and then no requests will be accepted, and it will stay high
> forever.
*/
> - static final int CHECK_THROTTLE_TIME = 60 * 1000;
> - /** Absolute limit of 4MB queued to any given peer. FIXME make this
configurable.
> - * Note that for many MessageItem's, the actual memory usage will be
significantly more than this figure. */
> - private static final long MAX_PEER_QUEUE_BYTES = 4 * 1024 * 1024;
> - /** Don't accept requests if it'll take more than 10 minutes to send
> the
current message queue */
> - private static final double MAX_PEER_QUEUE_TIME = 10 * 60 * 1000.0;
> -
> - private long lastAcceptedRequest = -1;
> -
> - final int estimatedSizeOfOneThrottledPacket;
> -
> - final Runnable throttledPacketSendAverageIdleUpdater =
> - new Runnable() {
> - public void run() {
> - long now = System.currentTimeMillis();
> - try {
> -
> if(throttledPacketSendAverage.lastReportTime() < now - 5000) { // if
last report more than 5 seconds ago
> - // shouldn't take long
> -
> node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
> -
> node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
> - long after =
> System.currentTimeMillis();
> - // Report time it takes to grab
> the bytes.
> -
> throttledPacketSendAverage.report(after - now);
> - }
> - } catch (Throwable t) {
> - Logger.error(this, "Caught "+t, t);
> - } finally {
> - node.getTicker().queueTimedJob(this,
> CHECK_THROTTLE_TIME);
> - long end = System.currentTimeMillis();
> - if(logMINOR)
> - Logger.minor(this, "Throttle
> check
took "+TimeUtil.formatTime(end-now,2,true));
> - }
> - }
> - };
> -
> - /* return reject reason as string if should reject, otherwise return
> null
*/
> - public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
> - logMINOR = Logger.shouldLog(Logger.MINOR, this);
> - if(logMINOR) dumpByteCostAverages();
> -
> - int threadCount = getActiveThreadCount();
> - if(threadLimit < threadCount) {
> - pInstantRejectIncoming.report(1.0);
> - rejected(">threadLimit", isLocal);
> - return ">threadLimit ("+threadCount+'/'+threadLimit+')';
> - }
> -
> - double bwlimitDelayTime =
> throttledPacketSendAverage.currentValue();
> -
> - // If no recent reports, no packets have been sent; correct the
> average
downwards.
> - long now = System.currentTimeMillis();
> - double pingTime = nodePinger.averagePingTime();
> - synchronized(this) {
> - // Round trip time
> - if(pingTime > MAX_PING_TIME) {
> - if((now - lastAcceptedRequest >
> MAX_INTERREQUEST_TIME) &&
canAcceptAnyway) {
> - if(logMINOR) Logger.minor(this,
> "Accepting request anyway (take one
every 10 secs to keep bwlimitDelayTime updated)");
> - } else {
> - pInstantRejectIncoming.report(1.0);
> - rejected(">MAX_PING_TIME", isLocal);
> - return ">MAX_PING_TIME
> ("+TimeUtil.formatTime((long)pingTime, 2,
true)+ ')';
> - }
> - } else if(pingTime > SUB_MAX_PING_TIME) {
> - double x = ((double)(pingTime -
> SUB_MAX_PING_TIME)) / (MAX_PING_TIME -
SUB_MAX_PING_TIME);
> - if(hardRandom.nextDouble() < x) {
> - pInstantRejectIncoming.report(1.0);
> - rejected(">SUB_MAX_PING_TIME", isLocal);
> - return ">SUB_MAX_PING_TIME
> ("+TimeUtil.formatTime((long)pingTime, 2,
true)+ ')';
> - }
> - }
> -
> - // Bandwidth limited packets
> - if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
> - if((now - lastAcceptedRequest >
> MAX_INTERREQUEST_TIME) &&
canAcceptAnyway) {
> - if(logMINOR) Logger.minor(this,
> "Accepting request anyway (take one
every 10 secs to keep bwlimitDelayTime updated)");
> - } else {
> - pInstantRejectIncoming.report(1.0);
> - rejected(">MAX_THROTTLE_DELAY",
> isLocal);
> - return ">MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
> - }
> - } else if(bwlimitDelayTime > SUB_MAX_THROTTLE_DELAY) {
> - double x = ((double)(bwlimitDelayTime -
> SUB_MAX_THROTTLE_DELAY)) /
(MAX_THROTTLE_DELAY - SUB_MAX_THROTTLE_DELAY);
> - if(hardRandom.nextDouble() < x) {
> - pInstantRejectIncoming.report(1.0);
> - rejected(">SUB_MAX_THROTTLE_DELAY",
> isLocal);
> - return ">SUB_MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
> - }
> - }
> -
> - }
> -
> - // Successful cluster timeout protection.
> - // Reject request if the result of all our current requests
> completing
simultaneously would be that
> - // some of them timeout.
> -
> - // Never reject a CHK and accept an SSK. Because if we do that,
> we would
be constantly accepting SSKs, as there
> - // would never be enough space for a CHK. So we add 1 to each
> type of
request's count before computing the
> - // bandwidth liability. Thus, if we have exactly enough space
> for 1 SSK
and 1 CHK, we can accept either, and
> - // when one of either type completes, we can accept one of
> either type
again: We never let SSKs drain the
> - // "bucket" and block CHKs.
> -
> - int numCHKRequests = node.getNumCHKRequests() + 1;
> - int numSSKRequests = node.getNumSSKRequests() + 1;
> - int numCHKInserts = node.getNumCHKInserts() + 1;
> - int numSSKInserts = node.getNumSSKInserts() + 1;
> - if(logMINOR)
> - Logger.minor(this, "Running (adjusted): CHK fetch
> "+numCHKRequests+" SSK
fetch "+numSSKRequests+" CHK insert "+numCHKInserts+" SSK
insert "+numSSKInserts);
> -
> - double bandwidthLiabilityOutput =
> - successfulChkFetchBytesSentAverage.currentValue() *
> numCHKRequests +
> - successfulSskFetchBytesSentAverage.currentValue() *
> numSSKRequests +
> - successfulChkInsertBytesSentAverage.currentValue() *
> numCHKInserts +
> - successfulSskInsertBytesSentAverage.currentValue() *
> numSSKInserts;
> - double bandwidthAvailableOutput =
> - node.getOutputBandwidthLimit() * 90; // 90 seconds at
> full power; we
have to leave some time for the search as well
> - bandwidthAvailableOutput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
> - if(bandwidthLiabilityOutput > bandwidthAvailableOutput) {
> - pInstantRejectIncoming.report(1.0);
> - rejected("Output bandwidth liability", isLocal);
> - return "Output bandwidth liability
> ("+bandwidthLiabilityOutput+"
> "+bandwidthAvailableOutput+")";
> - }
> -
> - double bandwidthLiabilityInput =
> - successfulChkFetchBytesReceivedAverage.currentValue() *
> numCHKRequests +
> - successfulSskFetchBytesReceivedAverage.currentValue() *
> numSSKRequests +
> - successfulChkInsertBytesReceivedAverage.currentValue()
> * numCHKInserts +
> - successfulSskInsertBytesReceivedAverage.currentValue()
> * numSSKInserts;
> - double bandwidthAvailableInput =
> - node.getInputBandwidthLimit() * 90; // 90 seconds at
> full power
> - bandwidthAvailableInput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
> - if(bandwidthLiabilityInput > bandwidthAvailableInput) {
> - pInstantRejectIncoming.report(1.0);
> - rejected("Input bandwidth liability", isLocal);
> - return "Input bandwidth liability
> ("+bandwidthLiabilityInput+"
> "+bandwidthAvailableInput+")";
> - }
> -
> -
> - // Do we have the bandwidth?
> - double expected = this.getThrottle(isLocal, isInsert, isSSK,
true).currentValue();
> - int expectedSent = (int)Math.max(expected, 0);
> - if(logMINOR)
> - Logger.minor(this, "Expected sent bytes:
> "+expectedSent);
> - if(!requestOutputThrottle.instantGrab(expectedSent)) {
> - pInstantRejectIncoming.report(1.0);
> - rejected("Insufficient output bandwidth", isLocal);
> - return "Insufficient output bandwidth";
> - }
> - expected = this.getThrottle(isLocal, isInsert, isSSK,
false).currentValue();
> - int expectedReceived = (int)Math.max(expected, 0);
> - if(logMINOR)
> - Logger.minor(this, "Expected received bytes:
> "+expectedSent);
> - if(!requestInputThrottle.instantGrab(expectedReceived)) {
> - requestOutputThrottle.recycle(expectedSent);
> - pInstantRejectIncoming.report(1.0);
> - rejected("Insufficient input bandwidth", isLocal);
> - return "Insufficient input bandwidth";
> - }
> -
> - Runtime r = Runtime.getRuntime();
> - long maxHeapMemory = r.maxMemory();
> - long totalHeapMemory = r.totalMemory();
> - long freeHeapMemory = r.freeMemory();
> - if(maxHeapMemory < Long.MAX_VALUE) { // would mean unlimited
> - freeHeapMemory = maxHeapMemory - (totalHeapMemory -
> freeHeapMemory);
> - }
> - if(freeHeapMemory < freeHeapBytesThreshold) {
> - pInstantRejectIncoming.report(1.0);
> - rejected("<freeHeapBytesThreshold", isLocal);
> - return "<freeHeapBytesThreshold
> ("+SizeUtil.formatSize(freeHeapMemory,
false)+" of "+SizeUtil.formatSize(maxHeapMemory, false)+')';
> - }
> - double percentFreeHeapMemoryOfMax = ((double) freeHeapMemory) /
> ((double)
maxHeapMemory);
> - double freeHeapPercentThresholdDouble = ((double)
freeHeapPercentThreshold) / ((double) 100);
> - if(percentFreeHeapMemoryOfMax < freeHeapPercentThresholdDouble)
> {
> - pInstantRejectIncoming.report(1.0);
> - DecimalFormat fix3p1pct = new DecimalFormat("##0.0%");
> - rejected("<freeHeapPercentThreshold", isLocal);
> - return "<freeHeapPercentThreshold
> ("+SizeUtil.formatSize(freeHeapMemory,
false)+" of "+SizeUtil.formatSize(maxHeapMemory, false)+"
("+fix3p1pct.format(percentFreeHeapMemoryOfMax)+"))";
> - }
> -
> - if(source != null) {
> - long queuedBytes = source.getMessageQueueLengthBytes();
> - if(queuedBytes > MAX_PEER_QUEUE_BYTES)
> - return "Too many message bytes queued for peer";
> - if(queuedBytes /
> (source.getThrottle().getBandwidth()+1.0) >
MAX_PEER_QUEUE_TIME)
> - return "Peer's queue will take too long to
> transfer";
> - }
> -
> - synchronized(this) {
> - if(logMINOR) Logger.minor(this, "Accepting request?");
> - lastAcceptedRequest = now;
> - }
> -
> - pInstantRejectIncoming.report(0.0);
> -
> - // Accept
> - return null;
> - }
> -
> - private void rejected(String reason, boolean isLocal) {
> - if(!isLocal) preemptiveRejectReasons.inc(reason);
> - else this.localPreemptiveRejectReasons.inc(reason);
> - }
> -
> - private RunningAverage getThrottle(boolean isLocal, boolean isInsert,
boolean isSSK, boolean isSent) {
> - if(isLocal) {
> - if(isInsert) {
> - if(isSSK) {
> - return isSent ?
> this.localSskInsertBytesSentAverage :
this.localSskInsertBytesReceivedAverage;
> - } else {
> - return isSent ?
> this.localChkInsertBytesSentAverage :
this.localChkInsertBytesReceivedAverage;
> - }
> - } else {
> - if(isSSK) {
> - return isSent ?
> this.localSskFetchBytesSentAverage :
this.localSskFetchBytesReceivedAverage;
> - } else {
> - return isSent ?
> this.localChkFetchBytesSentAverage :
this.localChkFetchBytesReceivedAverage;
> - }
> - }
> - } else {
> - if(isInsert) {
> - if(isSSK) {
> - return isSent ?
> this.remoteSskInsertBytesSentAverage :
this.remoteSskInsertBytesReceivedAverage;
> - } else {
> - return isSent ?
> this.remoteChkInsertBytesSentAverage :
this.remoteChkInsertBytesReceivedAverage;
> - }
> - } else {
> - if(isSSK) {
> - return isSent ?
> this.remoteSskFetchBytesSentAverage :
this.remoteSskFetchBytesReceivedAverage;
> - } else {
> - return isSent ?
> this.remoteChkFetchBytesSentAverage :
this.remoteChkFetchBytesReceivedAverage;
> - }
> - }
> - }
> - }
> -
> - private void dumpByteCostAverages() {
> - Logger.minor(this, "Byte cost averages: REMOTE:"+
> - " CHK insert
> "+remoteChkInsertBytesSentAverage.currentValue()+ '/'
+remoteChkInsertBytesReceivedAverage.currentValue()+
> - " SSK insert
> "+remoteSskInsertBytesSentAverage.currentValue()+ '/'
+remoteSskInsertBytesReceivedAverage.currentValue()+
> - " CHK fetch
> "+remoteChkFetchBytesSentAverage.currentValue()+ '/'
+remoteChkFetchBytesReceivedAverage.currentValue()+
> - " SSK fetch
> "+remoteSskFetchBytesSentAverage.currentValue()+ '/'
+remoteSskFetchBytesReceivedAverage.currentValue());
> - Logger.minor(this, "Byte cost averages: LOCAL:"+
> - " CHK insert
> "+localChkInsertBytesSentAverage.currentValue()+ '/'
+localChkInsertBytesReceivedAverage.currentValue()+
> - " SSK insert
> "+localSskInsertBytesSentAverage.currentValue()+ '/'
+localSskInsertBytesReceivedAverage.currentValue()+
> - " CHK fetch
> "+localChkFetchBytesSentAverage.currentValue()+ '/'
+localChkFetchBytesReceivedAverage.currentValue()+
> - " SSK fetch
> "+localSskFetchBytesSentAverage.currentValue()+ '/'
+localSskFetchBytesReceivedAverage.currentValue());
> - Logger.minor(this, "Byte cost averages: SUCCESSFUL:"+
> - " CHK insert
> "+successfulChkInsertBytesSentAverage.currentValue()+ '/'
+successfulChkInsertBytesReceivedAverage.currentValue()+
> - " SSK insert
> "+successfulSskInsertBytesSentAverage.currentValue()+ '/'
+successfulSskInsertBytesReceivedAverage.currentValue()+
> - " CHK fetch
> "+successfulChkFetchBytesSentAverage.currentValue()+ '/'
+successfulChkFetchBytesReceivedAverage.currentValue()+
> - " SSK fetch
> "+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue());
> -
> - }
> -
> - public double getBwlimitDelayTime() {
> - return throttledPacketSendAverage.currentValue();
> - }
> -
> - public double getNodeAveragePingTime() {
> - return nodePinger.averagePingTime();
> - }
> -
> - public int getNetworkSizeEstimate(long timestamp) {
> - return node.lm.getNetworkSizeEstimate( timestamp );
> - }
> -
> - public Object[] getKnownLocations(long timestamp) {
> - return node.lm.getKnownLocations( timestamp );
> - }
> -
> - public double pRejectIncomingInstantly() {
> - return pInstantRejectIncoming.currentValue();
> - }
> -
> - /**
> - * Update peerManagerUserAlertStats if the timer has expired.
> - * Only called from PacketSender so doesn't need sync.
> - */
> - public void maybeUpdatePeerManagerUserAlertStats(long now) {
> - if(now > nextPeerManagerUserAlertStatsUpdateTime) {
> - if(getBwlimitDelayTime() >
> MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD) {
> - if(firstBwlimitDelayTimeThresholdBreak == 0) {
> - firstBwlimitDelayTimeThresholdBreak =
> now;
> - }
> - } else {
> - firstBwlimitDelayTimeThresholdBreak = 0;
> - }
> - if((firstBwlimitDelayTimeThresholdBreak != 0) && ((now
> -
firstBwlimitDelayTimeThresholdBreak) >= MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY))
{
> - bwlimitDelayAlertRelevant = true;
> - } else {
> - bwlimitDelayAlertRelevant = false;
> - }
> - if(getNodeAveragePingTime() >
MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD) {
> - if(firstNodeAveragePingTimeThresholdBreak == 0)
> {
> - firstNodeAveragePingTimeThresholdBreak
> = now;
> - }
> - } else {
> - firstNodeAveragePingTimeThresholdBreak = 0;
> - }
> - if((firstNodeAveragePingTimeThresholdBreak != 0) &&
> ((now -
firstNodeAveragePingTimeThresholdBreak) >=
MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY)) {
> - nodeAveragePingAlertRelevant = true;
> - } else {
> - nodeAveragePingAlertRelevant = false;
> - }
> - if(logMINOR && Logger.shouldLog(Logger.DEBUG, this))
Logger.debug(this, "mUPMUAS: "+now+": "+getBwlimitDelayTime()+"
>? "+MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD+"
since "+firstBwlimitDelayTimeThresholdBreak+"
("+bwlimitDelayAlertRelevant+") "+getNodeAveragePingTime()+"
>? "+MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD+"
since "+firstNodeAveragePingTimeThresholdBreak+"
("+nodeAveragePingAlertRelevant+ ')');
> - nextPeerManagerUserAlertStatsUpdateTime = now +
peerManagerUserAlertStatsUpdateInterval;
> - }
> - }
> -
> - public SimpleFieldSet persistThrottlesToFieldSet() {
> - SimpleFieldSet fs = new SimpleFieldSet(true);
> - fs.put("RemoteChkFetchBytesSentAverage",
remoteChkFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("RemoteSskFetchBytesSentAverage",
remoteSskFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("RemoteChkInsertBytesSentAverage",
remoteChkInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("RemoteSskInsertBytesSentAverage",
remoteSskInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("RemoteChkFetchBytesReceivedAverage",
remoteChkFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("RemoteSskFetchBytesReceivedAverage",
remoteSskFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("RemoteChkInsertBytesReceivedAverage",
remoteChkInsertBytesReceivedAverage.exportFieldSet(true));
> - fs.put("RemoteSskInsertBytesReceivedAverage",
remoteSskInsertBytesReceivedAverage.exportFieldSet(true));
> - fs.put("LocalChkFetchBytesSentAverage",
localChkFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("LocalSskFetchBytesSentAverage",
localSskFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("LocalChkInsertBytesSentAverage",
localChkInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("LocalSskInsertBytesSentAverage",
localSskInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("LocalChkFetchBytesReceivedAverage",
localChkFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("LocalSskFetchBytesReceivedAverage",
localSskFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("LocalChkInsertBytesReceivedAverage",
localChkInsertBytesReceivedAverage.exportFieldSet(true));
> - fs.put("LocalSskInsertBytesReceivedAverage",
localSskInsertBytesReceivedAverage.exportFieldSet(true));
> - fs.put("SuccessfulChkFetchBytesSentAverage",
successfulChkFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("SuccessfulSskFetchBytesSentAverage",
successfulSskFetchBytesSentAverage.exportFieldSet(true));
> - fs.put("SuccessfulChkInsertBytesSentAverage",
successfulChkInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("SuccessfulSskInsertBytesSentAverage",
successfulSskInsertBytesSentAverage.exportFieldSet(true));
> - fs.put("SuccessfulChkFetchBytesReceivedAverage",
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("SuccessfulSskFetchBytesReceivedAverage",
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
> - fs.put("SuccessfulChkInsertBytesReceivedAverage",
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
> - fs.put("SuccessfulSskInsertBytesReceivedAverage",
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
> - return fs;
> - }
> -
> - /**
> - * Update the node-wide bandwidth I/O stats if the timer has expired
> - */
> - public void maybeUpdateNodeIOStats(long now) {
> - if(now > nextNodeIOStatsUpdateTime) {
> - long[] io_stats = IOStatisticCollector.getTotalIO();
> - long outdiff;
> - long indiff;
> - synchronized(ioStatSync) {
> - previous_output_stat = last_output_stat;
> - previous_input_stat = last_input_stat;
> - previous_io_stat_time = last_io_stat_time;
> - last_output_stat = io_stats[ 0 ];
> - last_input_stat = io_stats[ 1 ];
> - last_io_stat_time = now;
> - outdiff = last_output_stat -
> previous_output_stat;
> - indiff = last_input_stat - previous_input_stat;
> - }
> - if(logMINOR)
> - Logger.minor(this, "Last 2 seconds: input:
> "+indiff+"
output: "+outdiff);
> - nextNodeIOStatsUpdateTime = now +
> nodeIOStatsUpdateInterval;
> - }
> - }
> -
> - public long[] getNodeIOStats() {
> - long[] result = new long[6];
> - synchronized(ioStatSync) {
> - result[ 0 ] = previous_output_stat;
> - result[ 1 ] = previous_input_stat;
> - result[ 2 ] = previous_io_stat_time;
> - result[ 3 ] = last_output_stat;
> - result[ 4 ] = last_input_stat;
> - result[ 5 ] = last_io_stat_time;
> - }
> - return result;
> - }
> -
> - public void waitUntilNotOverloaded(boolean isInsert) {
> - while(threadLimit < getActiveThreadCount()){
> - try{
> - Thread.sleep(5000);
> - } catch (InterruptedException e) {}
> - }
> - }
> -
> - public int getActiveThreadCount() {
> - return rootThreadGroup.activeCount() -
> node.executor.waitingThreads();
> - }
> -
> - public int getThreadLimit() {
> - return threadLimit;
> - }
> -
> - public SimpleFieldSet exportVolatileFieldSet() {
> - SimpleFieldSet fs = new SimpleFieldSet(true);
> - long now = System.currentTimeMillis();
> - fs.put("isUsingWrapper", node.isUsingWrapper());
> - long nodeUptimeSeconds = 0;
> - synchronized(this) {
> - fs.put("startupTime", node.startupTime);
> - nodeUptimeSeconds = (now - node.startupTime) / 1000;
> - fs.put("uptimeSeconds", nodeUptimeSeconds);
> - }
> - fs.put("averagePingTime", getNodeAveragePingTime());
> - fs.put("bwlimitDelayTime", getBwlimitDelayTime());
> - fs.put("networkSizeEstimateSession",
> getNetworkSizeEstimate(-1));
> - int networkSizeEstimate24hourRecent =
> getNetworkSizeEstimate(now -
(24*60*60*1000)); // 24 hours
> - fs.put("networkSizeEstimate24hourRecent",
networkSizeEstimate24hourRecent);
> - int networkSizeEstimate48hourRecent =
> getNetworkSizeEstimate(now -
(48*60*60*1000)); // 48 hours
> - fs.put("networkSizeEstimate48hourRecent",
networkSizeEstimate48hourRecent);
> - fs.put("routingMissDistance",
> routingMissDistance.currentValue());
> - fs.put("backedOffPercent", backedOffPercent.currentValue());
> - fs.put("pInstantReject", pRejectIncomingInstantly());
> - fs.put("unclaimedFIFOSize", node.usm.getUnclaimedFIFOSize());
> -
> - /* gather connection statistics */
> - DarknetPeerNodeStatus[] peerNodeStatuses =
peers.getDarknetPeerNodeStatuses();
> - Arrays.sort(peerNodeStatuses, new Comparator() {
> - public int compare(Object first, Object second) {
> - DarknetPeerNodeStatus firstNode =
> (DarknetPeerNodeStatus) first;
> - DarknetPeerNodeStatus secondNode =
> (DarknetPeerNodeStatus) second;
> - int statusDifference =
> firstNode.getStatusValue() -
secondNode.getStatusValue();
> - if (statusDifference != 0) {
> - return statusDifference;
> - }
> - return
> firstNode.getName().compareToIgnoreCase(secondNode.getName());
> - }
> - });
> -
> - int numberOfConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_CONNECTED);
> - int numberOfRoutingBackedOff =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF);
> - int numberOfTooNew =
> PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_NEW);
> - int numberOfTooOld =
> PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_OLD);
> - int numberOfDisconnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISCONNECTED);
> - int numberOfNeverConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_NEVER_CONNECTED);
> - int numberOfDisabled =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISABLED);
> - int numberOfBursting =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_BURSTING);
> - int numberOfListening =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTENING);
> - int numberOfListenOnly =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTEN_ONLY);
> -
> - int numberOfSimpleConnected = numberOfConnected +
numberOfRoutingBackedOff;
> - int numberOfNotConnected = numberOfTooNew + numberOfTooOld +
numberOfDisconnected + numberOfNeverConnected + numberOfDisabled +
numberOfBursting + numberOfListening + numberOfListenOnly;
> -
> - fs.put("numberOfConnected", numberOfConnected);
> - fs.put("numberOfRoutingBackedOff", numberOfRoutingBackedOff);
> - fs.put("numberOfTooNew", numberOfTooNew);
> - fs.put("numberOfTooOld", numberOfTooOld);
> - fs.put("numberOfDisconnected", numberOfDisconnected);
> - fs.put("numberOfNeverConnected", numberOfNeverConnected);
> - fs.put("numberOfDisabled", numberOfDisabled);
> - fs.put("numberOfBursting", numberOfBursting);
> - fs.put("numberOfListening", numberOfListening);
> - fs.put("numberOfListenOnly", numberOfListenOnly);
> -
> - fs.put("numberOfSimpleConnected", numberOfSimpleConnected);
> - fs.put("numberOfNotConnected", numberOfNotConnected);
> -
> - fs.put("numberOfInsertSenders", node.getNumInsertSenders());
> - fs.put("numberOfRequestSenders", node.getNumRequestSenders());
> - fs.put("numberOfTransferringRequestSenders",
node.getNumTransferringRequestSenders());
> - fs.put("numberOfARKFetchers", node.getNumARKFetchers());
> -
> - long[] total = IOStatisticCollector.getTotalIO();
> - long total_output_rate = (total[0]) / nodeUptimeSeconds;
> - long total_input_rate = (total[1]) / nodeUptimeSeconds;
> - long totalPayloadOutput = node.getTotalPayloadSent();
> - long total_payload_output_rate = totalPayloadOutput /
> nodeUptimeSeconds;
> - int total_payload_output_percent = (int) (100 *
> totalPayloadOutput /
total[0]);
> - fs.put("totalOutputBytes", total[0]);
> - fs.put("totalOutputRate", total_output_rate);
> - fs.put("totalPayloadOutputBytes", totalPayloadOutput);
> - fs.put("totalPayloadOutputRate", total_payload_output_rate);
> - fs.put("totalPayloadOutputPercent",
> total_payload_output_percent);
> - fs.put("totalInputBytes", total[1]);
> - fs.put("totalInputRate", total_input_rate);
> -
> - long[] rate = getNodeIOStats();
> - long deltaMS = (rate[5] - rate[2]);
> - double recent_output_rate = 1000.0 * (rate[3] - rate[0]) /
> deltaMS;
> - double recent_input_rate = 1000.0 * (rate[4] - rate[1]) /
> deltaMS;
> - fs.put("recentOutputRate", recent_output_rate);
> - fs.put("recentInputRate", recent_input_rate);
> -
> - String [] routingBackoffReasons =
peers.getPeerNodeRoutingBackoffReasons();
> - if(routingBackoffReasons.length != 0) {
> - for(int i=0;i<routingBackoffReasons.length;i++) {
> - fs.put("numberWithRoutingBackoffReasons." +
> routingBackoffReasons[i],
peers.getPeerNodeRoutingBackoffReasonSize(routingBackoffReasons[i]));
> - }
> - }
> -
> - double swaps = (double)node.getSwaps();
> - double noSwaps = (double)node.getNoSwaps();
> - double numberOfRemotePeerLocationsSeenInSwaps =
(double)node.getNumberOfRemotePeerLocationsSeenInSwaps();
> - fs.putSingle("numberOfRemotePeerLocationsSeenInSwaps",
Double.toString(numberOfRemotePeerLocationsSeenInSwaps));
> - double avgConnectedPeersPerNode = 0.0;
> - if ((numberOfRemotePeerLocationsSeenInSwaps > 0.0) && ((swaps >
> 0.0) ||
(noSwaps > 0.0))) {
> - avgConnectedPeersPerNode =
numberOfRemotePeerLocationsSeenInSwaps/(swaps+noSwaps);
> - }
> - fs.putSingle("avgConnectedPeersPerNode",
Double.toString(avgConnectedPeersPerNode));
> -
> - int startedSwaps = node.getStartedSwaps();
> - int swapsRejectedAlreadyLocked =
> node.getSwapsRejectedAlreadyLocked();
> - int swapsRejectedNowhereToGo =
> node.getSwapsRejectedNowhereToGo();
> - int swapsRejectedRateLimit = node.getSwapsRejectedRateLimit();
> - int swapsRejectedLoop = node.getSwapsRejectedLoop();
> - int swapsRejectedRecognizedID =
> node.getSwapsRejectedRecognizedID();
> - double locationChangePerSession =
> node.getLocationChangeSession();
> - double locationChangePerSwap = 0.0;
> - double locationChangePerMinute = 0.0;
> - double swapsPerMinute = 0.0;
> - double noSwapsPerMinute = 0.0;
> - double swapsPerNoSwaps = 0.0;
> - if (swaps > 0) {
> - locationChangePerSwap = locationChangePerSession/swaps;
> - }
> - if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> - locationChangePerMinute =
> locationChangePerSession/(double)
(nodeUptimeSeconds/60.0);
> - }
> - if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> - swapsPerMinute = swaps/(double)(nodeUptimeSeconds/60.0);
> - }
> - if ((noSwaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> - noSwapsPerMinute =
> noSwaps/(double)(nodeUptimeSeconds/60.0);
> - }
> - if ((swaps > 0.0) && (noSwaps > 0.0)) {
> - swapsPerNoSwaps = swaps/noSwaps;
> - }
> - fs.put("locationChangePerSession", locationChangePerSession);
> - fs.put("locationChangePerSwap", locationChangePerSwap);
> - fs.put("locationChangePerMinute", locationChangePerMinute);
> - fs.put("swapsPerMinute", swapsPerMinute);
> - fs.put("noSwapsPerMinute", noSwapsPerMinute);
> - fs.put("swapsPerNoSwaps", swapsPerNoSwaps);
> - fs.put("swaps", swaps);
> - fs.put("noSwaps", noSwaps);
> - fs.put("startedSwaps", startedSwaps);
> - fs.put("swapsRejectedAlreadyLocked",
> swapsRejectedAlreadyLocked);
> - fs.put("swapsRejectedNowhereToGo", swapsRejectedNowhereToGo);
> - fs.put("swapsRejectedRateLimit", swapsRejectedRateLimit);
> - fs.put("swapsRejectedLoop", swapsRejectedLoop);
> - fs.put("swapsRejectedRecognizedID", swapsRejectedRecognizedID);
> - long fix32kb = 32 * 1024;
> - long cachedKeys = node.getChkDatacache().keyCount();
> - long cachedSize = cachedKeys * fix32kb;
> - long storeKeys = node.getChkDatastore().keyCount();
> - long storeSize = storeKeys * fix32kb;
> - long overallKeys = cachedKeys + storeKeys;
> - long overallSize = cachedSize + storeSize;
> -
> - long maxOverallKeys = node.getMaxTotalKeys();
> - long maxOverallSize = maxOverallKeys * fix32kb;
> -
> - double percentOverallKeysOfMax = (double)
(overallKeys*100)/(double)maxOverallKeys;
> -
> - long cachedStoreHits = node.getChkDatacache().hits();
> - long cachedStoreMisses = node.getChkDatacache().misses();
> - long cacheAccesses = cachedStoreHits + cachedStoreMisses;
> - double percentCachedStoreHitsOfAccesses =
> (double)(cachedStoreHits*100) /
(double)cacheAccesses;
> - long storeHits = node.getChkDatastore().hits();
> - long storeMisses = node.getChkDatastore().misses();
> - long storeAccesses = storeHits + storeMisses;
> - double percentStoreHitsOfAccesses = (double)(storeHits*100) /
(double)storeAccesses;
> - long overallAccesses = storeAccesses + cacheAccesses;
> - double avgStoreAccessRate =
(double)overallAccesses/(double)nodeUptimeSeconds;
> -
> - fs.put("cachedKeys", cachedKeys);
> - fs.put("cachedSize", cachedSize);
> - fs.put("storeKeys", storeKeys);
> - fs.put("storeSize", storeSize);
> - fs.put("overallKeys", overallKeys);
> - fs.put("overallSize", overallSize);
> - fs.put("maxOverallKeys", maxOverallKeys);
> - fs.put("maxOverallSize", maxOverallSize);
> - fs.put("percentOverallKeysOfMax", percentOverallKeysOfMax);
> - fs.put("cachedStoreHits", cachedStoreHits);
> - fs.put("cachedStoreMisses", cachedStoreMisses);
> - fs.put("cacheAccesses", cacheAccesses);
> - fs.put("percentCachedStoreHitsOfAccesses",
percentCachedStoreHitsOfAccesses);
> - fs.put("storeHits", storeHits);
> - fs.put("storeMisses", storeMisses);
> - fs.put("storeAccesses", storeAccesses);
> - fs.put("percentStoreHitsOfAccesses",
> percentStoreHitsOfAccesses);
> - fs.put("overallAccesses", overallAccesses);
> - fs.put("avgStoreAccessRate", avgStoreAccessRate);
> -
> - Runtime rt = Runtime.getRuntime();
> - float freeMemory = (float) rt.freeMemory();
> - float totalMemory = (float) rt.totalMemory();
> - float maxMemory = (float) rt.maxMemory();
> -
> - long usedJavaMem = (long)(totalMemory - freeMemory);
> - long allocatedJavaMem = (long)totalMemory;
> - long maxJavaMem = (long)maxMemory;
> - int availableCpus = rt.availableProcessors();
> -
> - fs.put("freeJavaMemory", (long)freeMemory);
> - fs.put("usedJavaMemory", usedJavaMem);
> - fs.put("allocatedJavaMemory", allocatedJavaMem);
> - fs.put("maximumJavaMemory", maxJavaMem);
> - fs.put("availableCPUs", availableCpus);
> - fs.put("runningThreadCount", getActiveThreadCount());
> -
> - return fs;
> - }
> -
> - public void setOutputLimit(int obwLimit) {
> - requestOutputThrottle.changeNanosAndBucketSize((int)
((1000L*1000L*1000L) / (obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
Math.max(obwLimit*60, 32768*20));
> - if(node.inputLimitDefault) {
> - setInputLimit(obwLimit * 4);
> - }
> - }
> -
> - public void setInputLimit(int ibwLimit) {
> -
> requestInputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(ibwLimit*60,
32768*20));
> - }
> -
> - public boolean isTestnetEnabled() {
> - return node.isTestnetEnabled();
> - }
> -
> - public boolean getRejectReasonsTable(HTMLNode table) {
> - return preemptiveRejectReasons.toTableRows(table) > 0;
> - }
> -
> - public boolean getLocalRejectReasonsTable(HTMLNode table) {
> - return localPreemptiveRejectReasons.toTableRows(table) > 0;
> - }
> -
> - public void requestCompleted(boolean succeeded, boolean isRemote,
> boolean
isSSK) {
> - globalFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> - if(isSSK)
> - sskFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> - else
> - chkFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> - if(isRemote)
> - remoteFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> - else
> - localFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> - }
> -
> - private final DecimalFormat fix3p3pct = new DecimalFormat("##0.000%");
> - private final NumberFormat thousendPoint = NumberFormat.getInstance();
> -
> - public void fillSuccessRateBox(HTMLNode parent) {
> - HTMLNode list = parent.addChild("table", "border", "0");
> - final TrivialRunningAverage[] averages = new
> TrivialRunningAverage[] {
> - globalFetchPSuccess,
> - chkFetchPSuccess,
> - sskFetchPSuccess,
> - localFetchPSuccess,
> - remoteFetchPSuccess
> - };
> - final String[] names = new String[] {
> - // FIXME l10n, but atm this only shows up in
> advanced mode
> - "All requests",
> - "CHKs",
> - "SSKs",
> - "Local requests",
> - "Remote requests"
> - };
> - HTMLNode row = list.addChild("tr");
> - row.addChild("th", "Group");
> - row.addChild("th", "P(success)");
> - row.addChild("th", "Count");
> -
> - for(int i=0;i<averages.length;i++) {
> - row = list.addChild("tr");
> - row.addChild("td", names[i]);
> - if (averages[i].countReports()==0) {
> - row.addChild("td", "-");
> - row.addChild("td", "0");
> - } else {
> - row.addChild("td",
> fix3p3pct.format(averages[i].currentValue()));
> - row.addChild("td",
> thousendPoint.format(averages[i].countReports()));
> - }
> - }
> - }
> -
> -}
> +package freenet.node;
> +
> +import java.io.File;
> +import java.text.DecimalFormat;
> +import java.text.NumberFormat;
> +import java.util.Arrays;
> +import java.util.Comparator;
> +
> +import freenet.config.InvalidConfigValueException;
> +import freenet.config.SubConfig;
> +import freenet.crypt.RandomSource;
> +import freenet.io.comm.DMT;
> +import freenet.io.comm.IOStatisticCollector;
> +import freenet.l10n.L10n;
> +import freenet.support.HTMLNode;
> +import freenet.support.Logger;
> +import freenet.support.SimpleFieldSet;
> +import freenet.support.SizeUtil;
> +import freenet.support.StringCounter;
> +import freenet.support.TimeUtil;
> +import freenet.support.TokenBucket;
> +import freenet.support.api.BooleanCallback;
> +import freenet.support.api.IntCallback;
> +import freenet.support.api.LongCallback;
> +import freenet.support.math.BootstrappingDecayingRunningAverage;
> +import freenet.support.math.RunningAverage;
> +import freenet.support.math.TimeDecayingRunningAverage;
> +import freenet.support.math.TrivialRunningAverage;
> +
> +/** Node (as opposed to NodeClientCore) level statistics. Includes
shouldRejectRequest(), but not limited
> + * to stuff required to implement that. */
> +public class NodeStats implements Persistable {
> +
> + /** Sub-max ping time. If ping is greater than this, we reject some
requests. */
> + public static final long SUB_MAX_PING_TIME = 700;
> + /** Maximum overall average ping time. If ping is greater than this,
> + * we reject all requests. */
> + public static final long MAX_PING_TIME = 1500;
> + /** Maximum throttled packet delay. If the throttled packet delay is
greater
> + * than this, reject all packets. */
> + public static final long MAX_THROTTLE_DELAY = 3000;
> + /** If the throttled packet delay is less than this, reject no packets;
> if
it's
> + * between the two, reject some packets. */
> + public static final long SUB_MAX_THROTTLE_DELAY = 2000;
> + /** How high can bwlimitDelayTime be before we alert (in milliseconds)*/
> + public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD =
MAX_THROTTLE_DELAY*2;
> + /** How high can nodeAveragePingTime be before we alert (in
milliseconds)*/
> + public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD =
MAX_PING_TIME*2;
> + /** How long we're over the bwlimitDelayTime threshold before we alert
> (in
milliseconds)*/
> + public static final long MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
> + /** How long we're over the nodeAveragePingTime threshold before we
> alert
(in milliseconds)*/
> + public static final long MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY =
10*60*1000; // 10 minutes
> + /** Accept one request every 10 seconds regardless, to ensure we update
the
> + * block send time.
> + */
> + public static final int MAX_INTERREQUEST_TIME = 10*1000;
> +
> + /** Fudge factor for high level bandwidth limiting. FIXME should be a
> long
term running average */
> + public static final double FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS = 0.8;
> +
> + private final Node node;
> + private MemoryChecker myMemoryChecker;
> + public final PeerManager peers;
> +
> + final RandomSource hardRandom;
> +
> + private boolean logMINOR;
> +
> + /** first time bwlimitDelay was over PeerManagerUserAlert threshold */
> + private long firstBwlimitDelayTimeThresholdBreak ;
> + /** first time nodeAveragePing was over PeerManagerUserAlert threshold
> */
> + private long firstNodeAveragePingTimeThresholdBreak;
> + /** bwlimitDelay PeerManagerUserAlert should happen if true */
> + public boolean bwlimitDelayAlertRelevant;
> + /** nodeAveragePing PeerManagerUserAlert should happen if true */
> + public boolean nodeAveragePingAlertRelevant;
> + /** Average proportion of requests rejected immediately due to overload
> */
> + public final TimeDecayingRunningAverage pInstantRejectIncoming;
> +
> + /** Average delay caused by throttling for sending a packet */
> + final TimeDecayingRunningAverage throttledPacketSendAverage;
> +
> + // Bytes used by each different type of local/remote chk/ssk
request/insert
> + final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
> + final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
> + final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
> + final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
> + final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
> + final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
> + final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
> + final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
> + final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
> + final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
> + final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
> + final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;
> +
> + // Bytes used by successful chk/ssk request/insert.
> + // Note: These are used to determine whether to accept a request,
> + // hence they should be roughly representative of incoming - NOT LOCAL -
> + // requests. Therefore, while we DO report local successful requests,
> + // we only report the portion which will be consistent with a remote
> + // request. If there is both a Handler and a Sender, it's a remote
> + // request, report both. If there is only a Sender, report only the
> + // received bytes (for a request). Etc.
> +
> + // Note that these are always reported in the Handler or the
NodeClientCore
> + // call taking its place.
> + final TimeDecayingRunningAverage successfulChkFetchBytesSentAverage;
> + final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
> + final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
> + final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
> + final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
> + final TimeDecayingRunningAverage
> successfulChkInsertBytesReceivedAverage;
> + final TimeDecayingRunningAverage
> successfulSskInsertBytesReceivedAverage;
> +
> + final TrivialRunningAverage globalFetchPSuccess;
> + final TrivialRunningAverage chkFetchPSuccess;
> + final TrivialRunningAverage sskFetchPSuccess;
> + final TrivialRunningAverage localFetchPSuccess;
> + final TrivialRunningAverage remoteFetchPSuccess;
> +
> + File persistTarget;
> + File persistTemp;
> + private long previous_input_stat;
> + private long previous_output_stat;
> + private long previous_io_stat_time;
> + private long last_input_stat;
> + private long last_output_stat;
> + private long last_io_stat_time;
> + private final Object ioStatSync = new Object();
> + /** Next time to update the node I/O stats */
> + private long nextNodeIOStatsUpdateTime = -1;
> + /** Node I/O stats update interval (milliseconds) */
> + private static final long nodeIOStatsUpdateInterval = 2000;
> +
> + /** Token bucket for output bandwidth used by requests */
> + final TokenBucket requestOutputThrottle;
> + /** Token bucket for input bandwidth used by requests */
> + final TokenBucket requestInputThrottle;
> +
> + // various metrics
> + public final RunningAverage routingMissDistance;
> + public final RunningAverage backedOffPercent;
> + public final RunningAverage avgCacheLocation;
> + public final RunningAverage avgStoreLocation;
> + public final RunningAverage avgCacheSuccess;
> + public final RunningAverage avgStoreSuccess;
> + // FIXME: does furthest{Store,Cache}Success need to be synchronized?
> + public double furthestCacheSuccess=0.0;
> + public double furthestStoreSuccess=0.0;
> + protected final Persister persister;
> +
> + protected final RunningAverage avgRequestLocation;
> +
> + // ThreadCounting stuffs
> + public final ThreadGroup rootThreadGroup;
> + private int threadLimit;
> +
> + // Free heap memory threshold stuffs
> + private long freeHeapBytesThreshold;
> + private int freeHeapPercentThreshold;
> +
> + final NodePinger nodePinger;
> +
> + final StringCounter preemptiveRejectReasons;
> + final StringCounter localPreemptiveRejectReasons;
> +
> + // Enable this if you run into hard to debug OOMs.
> + // Disabled to prevent long pauses every 30 seconds.
> + private int aggressiveGCModificator = -1 /*250*/;
> +
> + // Peers stats
> + /** Next time to update PeerManagerUserAlert stats */
> + private long nextPeerManagerUserAlertStatsUpdateTime = -1;
> + /** PeerManagerUserAlert stats update interval (milliseconds) */
> + private static final long peerManagerUserAlertStatsUpdateInterval =
1000; // 1 second
> +
> + NodeStats(Node node, int sortOrder, SubConfig statsConfig,
> SimpleFieldSet
oldThrottleFS, int obwLimit, int ibwLimit) throws NodeInitException {
> + logMINOR = Logger.shouldLog(Logger.MINOR, this);
> + this.node = node;
> + this.peers = node.peers;
> + this.hardRandom = node.random;
> + this.routingMissDistance = new TimeDecayingRunningAverage(0.0,
> 180000,
0.0, 1.0, node);
> + this.backedOffPercent = new TimeDecayingRunningAverage(0.0,
> 180000, 0.0,
1.0, node);
> + double nodeLoc=node.lm.getLocation();
> + // FIXME PLEASE; (int) casts; (maxCacheKeys>MAXINT?) This value
> will
probably end up being a small constant anyway (200?).
> + this.avgCacheLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0,
(int)node.maxCacheKeys, null);
> + this.avgStoreLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0,
(int)node.maxStoreKeys, null);
> + this.avgCacheSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> + this.avgStoreSuccess = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> + this.avgRequestLocation = new
BootstrappingDecayingRunningAverage(nodeLoc, 0.0, 1.0, 10000, null);
> + preemptiveRejectReasons = new StringCounter();
> + localPreemptiveRejectReasons = new StringCounter();
> + pInstantRejectIncoming = new TimeDecayingRunningAverage(0,
> 60000, 0.0,
1.0, node);
> + ThreadGroup tg = Thread.currentThread().getThreadGroup();
> + while(tg.getParent() != null) tg = tg.getParent();
> + this.rootThreadGroup = tg;
> + throttledPacketSendAverage =
> + new TimeDecayingRunningAverage(1, 10*60*1000 /* should
> be significantly
longer than a typical transfer */, 0, Long.MAX_VALUE, node);
> + nodePinger = new NodePinger(node);
> +
> + previous_input_stat = 0;
> + previous_output_stat = 0;
> + previous_io_stat_time = 1;
> + last_input_stat = 0;
> + last_output_stat = 0;
> + last_io_stat_time = 3;
> +
> + statsConfig.register("threadLimit", 500, sortOrder++, true,
true, "NodeStat.threadLimit", "NodeStat.threadLimitLong",
> + new IntCallback() {
> + public int get() {
> + return threadLimit;
> + }
> + public void set(int val) throws
> InvalidConfigValueException {
> + if(val == get()) return;
> + if(val < 100)
> + throw new
> InvalidConfigValueException(l10n("valueTooLow"));
> + threadLimit = val;
> + }
> + });
> + threadLimit = statsConfig.getInt("threadLimit");
> +
> + // Yes it could be in seconds insteed of multiples of 0.12, but
> we don't
want people to play with it :)
> + statsConfig.register("aggressiveGC", aggressiveGCModificator,
sortOrder++, true,
false, "NodeStat.aggressiveGC", "NodeStat.aggressiveGCLong",
> + new IntCallback() {
> + public int get() {
> + return aggressiveGCModificator;
> + }
> + public void set(int val) throws
> InvalidConfigValueException {
> + if(val == get()) return;
> + Logger.normal(this, "Changing
> aggressiveGCModificator to "+val);
> + aggressiveGCModificator = val;
> + }
> + });
> + aggressiveGCModificator = statsConfig.getInt("aggressiveGC");
> +
> + myMemoryChecker = new MemoryChecker(node.ps,
> aggressiveGCModificator);
> + statsConfig.register("memoryChecker", true, sortOrder++, true,
false, "NodeStat.memCheck", "NodeStat.memCheckLong",
> + new BooleanCallback(){
> + public boolean get() {
> + return
> myMemoryChecker.isRunning();
> + }
> +
> + public void set(boolean val) throws
> InvalidConfigValueException {
> + if(val == get()) return;
> +
> + if(val)
> + myMemoryChecker.start();
> + else
> +
> myMemoryChecker.terminate();
> + }
> + });
> + if(statsConfig.getBoolean("memoryChecker"))
> + myMemoryChecker.start();
> +
> + statsConfig.register("freeHeapBytesThreshold", "5M",
> sortOrder++, true,
true, "NodeStat.freeHeapBytesThreshold", "NodeStat.freeHeapBytesThresholdLong",
> + new LongCallback() {
> + public long get() {
> + return freeHeapBytesThreshold;
> + }
> + public void set(long val) throws
> InvalidConfigValueException {
> + if(val == get()) return;
> + if(val < 0)
> + throw new
> InvalidConfigValueException(l10n("valueTooLow"));
> + freeHeapBytesThreshold = val;
> + }
> + });
> + freeHeapBytesThreshold =
> statsConfig.getLong("freeHeapBytesThreshold");
> +
> + statsConfig.register("freeHeapPercentThreshold", "5",
> sortOrder++, true,
true, "NodeStat.freeHeapPercentThreshold",
"NodeStat.freeHeapPercentThresholdLong",
> + new IntCallback() {
> + public int get() {
> + return freeHeapPercentThreshold;
> + }
> + public void set(int val) throws
> InvalidConfigValueException {
> + if(val == get()) return;
> + if(val < 0 || val >= 100)
> + throw new
InvalidConfigValueException(l10n("mustBePercentValueNotFull"));
> + freeHeapPercentThreshold = val;
> + }
> + });
> + freeHeapPercentThreshold =
statsConfig.getInt("freeHeapPercentThreshold");
> +
> + persister = new ConfigurablePersister(this,
statsConfig, "nodeThrottleFile", "node-throttle.dat", sortOrder++, true,
false,
> + "NodeStat.statsPersister",
> "NodeStat.statsPersisterLong", node.ps);
> +
> + SimpleFieldSet throttleFS = persister.read();
> +
> + if(throttleFS == null)
> + throttleFS = oldThrottleFS;
> +
> + if(logMINOR) Logger.minor(this, "Read
> throttleFS:\n"+throttleFS);
> +
> + // Guesstimates. Hopefully well over the reality.
> + localChkFetchBytesSentAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesSentAverage"), node);
> + localSskFetchBytesSentAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskFetchBytesSentAverage"), node);
> + localChkInsertBytesSentAverage = new
> TimeDecayingRunningAverage(32768,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesSentAverage"), node);
> + localSskInsertBytesSentAverage = new
> TimeDecayingRunningAverage(2048,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskInsertBytesSentAverage"), node);
> + localChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkFetchBytesReceivedAverage"), node);
> + localSskFetchBytesReceivedAverage = new
> TimeDecayingRunningAverage(2048,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalSskFetchBytesReceivedAverage"), node);
> + localChkInsertBytesReceivedAverage = new
> TimeDecayingRunningAverage(1024,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
> + localSskInsertBytesReceivedAverage = new
> TimeDecayingRunningAverage(500,
180000, 0.0, 1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("LocalChkInsertBytesReceivedAverage"), node);
> +
> + remoteChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesSentAverage"), node);
> + remoteSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskFetchBytesSentAverage"), node);
> + remoteChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesSentAverage"), node);
> + remoteSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesSentAverage"), node);
> + remoteChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("RemoteChkFetchBytesReceivedAverage"), node);
> + remoteSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null : throttleFS.subset("RemoteSskFetchBytesReceivedAverage"),
node);
> + remoteChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteChkInsertBytesReceivedAverage"), node);
> + remoteSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("RemoteSskInsertBytesReceivedAverage"), node);
> +
> + successfulChkFetchBytesSentAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesSentAverage"), node);
> + successfulSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
> + successfulChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
> + successfulSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
> + successfulChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
> + successfulSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"),
node);
> + successfulChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
> + successfulSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
> +
> + globalFetchPSuccess = new TrivialRunningAverage();
> + chkFetchPSuccess = new TrivialRunningAverage();
> + sskFetchPSuccess = new TrivialRunningAverage();
> + localFetchPSuccess = new TrivialRunningAverage();
> + remoteFetchPSuccess = new TrivialRunningAverage();
> +
> + requestOutputThrottle =
> + new TokenBucket(Math.max(obwLimit*60, 32768*20), (int)
((1000L*1000L*1000L) / (obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
0);
> + requestInputThrottle =
> + new TokenBucket(Math.max(ibwLimit*60, 32768*20), (int)
((1000L*1000L*1000L) / (ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
0);
> +
> + estimatedSizeOfOneThrottledPacket = 1024 +
> DMT.packetTransmitSize(1024,
32) +
> + node.estimateFullHeadersLengthOneMessage();
> + }
> +
> + protected String l10n(String key) {
> + return L10n.getString("NodeStats."+key);
> + }
> +
> + public void start() throws NodeInitException {
> + nodePinger.start();
> + persister.start();
> +
> node.getTicker().queueTimedJob(throttledPacketSendAverageIdleUpdater,
CHECK_THROTTLE_TIME);
> + }
> +
> + /** Every 60 seconds, check whether we need to adjust the bandwidth
> delay
time because of idleness.
> + * (If no packets have been sent, the throttledPacketSendAverage should
decrease; if it doesn't, it may go high,
> + * and then no requests will be accepted, and it will stay high
> forever.
*/
> + static final int CHECK_THROTTLE_TIME = 60 * 1000;
> + /** Absolute limit of 4MB queued to any given peer. FIXME make this
configurable.
> + * Note that for many MessageItem's, the actual memory usage will be
significantly more than this figure. */
> + private static final long MAX_PEER_QUEUE_BYTES = 4 * 1024 * 1024;
> + /** Don't accept requests if it'll take more than 10 minutes to send
> the
current message queue */
> + private static final double MAX_PEER_QUEUE_TIME = 10 * 60 * 1000.0;
> +
> + private long lastAcceptedRequest = -1;
> +
> + final int estimatedSizeOfOneThrottledPacket;
> +
> + final Runnable throttledPacketSendAverageIdleUpdater =
> + new Runnable() {
> + public void run() {
> + long now = System.currentTimeMillis();
> + try {
> +
> if(throttledPacketSendAverage.lastReportTime() < now - 5000) { // if
last report more than 5 seconds ago
> + // shouldn't take long
> +
> node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
> +
> node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
> + long after =
> System.currentTimeMillis();
> + // Report time it takes to grab
> the bytes.
> +
> throttledPacketSendAverage.report(after - now);
> + }
> + } catch (Throwable t) {
> + Logger.error(this, "Caught "+t, t);
> + } finally {
> + node.getTicker().queueTimedJob(this,
> CHECK_THROTTLE_TIME);
> + long end = System.currentTimeMillis();
> + if(logMINOR)
> + Logger.minor(this, "Throttle
> check
took "+TimeUtil.formatTime(end-now,2,true));
> + }
> + }
> + };
> +
> + /* return reject reason as string if should reject, otherwise return
> null
*/
> + public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
> + logMINOR = Logger.shouldLog(Logger.MINOR, this);
> + if(logMINOR) dumpByteCostAverages();
> +
> + int threadCount = getActiveThreadCount();
> + if(threadLimit < threadCount) {
> + pInstantRejectIncoming.report(1.0);
> + rejected(">threadLimit", isLocal);
> + return ">threadLimit ("+threadCount+'/'+threadLimit+')';
> + }
> +
> + double bwlimitDelayTime =
> throttledPacketSendAverage.currentValue();
> +
> + // If no recent reports, no packets have been sent; correct the
> average
downwards.
> + long now = System.currentTimeMillis();
> + double pingTime = nodePinger.averagePingTime();
> + synchronized(this) {
> + // Round trip time
> + if(pingTime > MAX_PING_TIME) {
> + if((now - lastAcceptedRequest >
> MAX_INTERREQUEST_TIME) &&
canAcceptAnyway) {
> + if(logMINOR) Logger.minor(this,
> "Accepting request anyway (take one
every 10 secs to keep bwlimitDelayTime updated)");
> + } else {
> + pInstantRejectIncoming.report(1.0);
> + rejected(">MAX_PING_TIME", isLocal);
> + return ">MAX_PING_TIME
> ("+TimeUtil.formatTime((long)pingTime, 2,
true)+ ')';
> + }
> + } else if(pingTime > SUB_MAX_PING_TIME) {
> + double x = ((double)(pingTime -
> SUB_MAX_PING_TIME)) / (MAX_PING_TIME -
SUB_MAX_PING_TIME);
> + if(hardRandom.nextDouble() < x) {
> + pInstantRejectIncoming.report(1.0);
> + rejected(">SUB_MAX_PING_TIME", isLocal);
> + return ">SUB_MAX_PING_TIME
> ("+TimeUtil.formatTime((long)pingTime, 2,
true)+ ')';
> + }
> + }
> +
> + // Bandwidth limited packets
> + if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
> + if((now - lastAcceptedRequest >
> MAX_INTERREQUEST_TIME) &&
canAcceptAnyway) {
> + if(logMINOR) Logger.minor(this,
> "Accepting request anyway (take one
every 10 secs to keep bwlimitDelayTime updated)");
> + } else {
> + pInstantRejectIncoming.report(1.0);
> + rejected(">MAX_THROTTLE_DELAY",
> isLocal);
> + return ">MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
> + }
> + } else if(bwlimitDelayTime > SUB_MAX_THROTTLE_DELAY) {
> + double x = ((double)(bwlimitDelayTime -
> SUB_MAX_THROTTLE_DELAY)) /
(MAX_THROTTLE_DELAY - SUB_MAX_THROTTLE_DELAY);
> + if(hardRandom.nextDouble() < x) {
> + pInstantRejectIncoming.report(1.0);
> + rejected(">SUB_MAX_THROTTLE_DELAY",
> isLocal);
> + return ">SUB_MAX_THROTTLE_DELAY
("+TimeUtil.formatTime((long)bwlimitDelayTime, 2, true)+ ')';
> + }
> + }
> +
> + }
> +
> + // Successful cluster timeout protection.
> + // Reject request if the result of all our current requests
> completing
simultaneously would be that
> + // some of them timeout.
> +
> + // Never reject a CHK and accept an SSK. Because if we do that,
> we would
be constantly accepting SSKs, as there
> + // would never be enough space for a CHK. So we add 1 to each
> type of
request's count before computing the
> + // bandwidth liability. Thus, if we have exactly enough space
> for 1 SSK
and 1 CHK, we can accept either, and
> + // when one of either type completes, we can accept one of
> either type
again: We never let SSKs drain the
> + // "bucket" and block CHKs.
> +
> + int numCHKRequests = node.getNumCHKRequests() + 1;
> + int numSSKRequests = node.getNumSSKRequests() + 1;
> + int numCHKInserts = node.getNumCHKInserts() + 1;
> + int numSSKInserts = node.getNumSSKInserts() + 1;
> + if(logMINOR)
> + Logger.minor(this, "Running (adjusted): CHK fetch
> "+numCHKRequests+" SSK
fetch "+numSSKRequests+" CHK insert "+numCHKInserts+" SSK
insert "+numSSKInserts);
> +
> + double bandwidthLiabilityOutput =
> + successfulChkFetchBytesSentAverage.currentValue() *
> numCHKRequests +
> + successfulSskFetchBytesSentAverage.currentValue() *
> numSSKRequests +
> + successfulChkInsertBytesSentAverage.currentValue() *
> numCHKInserts +
> + successfulSskInsertBytesSentAverage.currentValue() *
> numSSKInserts;
> + double bandwidthAvailableOutput =
> + node.getOutputBandwidthLimit() * 90; // 90 seconds at
> full power; we
have to leave some time for the search as well
> + bandwidthAvailableOutput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
> + if(bandwidthLiabilityOutput > bandwidthAvailableOutput) {
> + pInstantRejectIncoming.report(1.0);
> + rejected("Output bandwidth liability", isLocal);
> + return "Output bandwidth liability
> ("+bandwidthLiabilityOutput+"
> "+bandwidthAvailableOutput+")";
> + }
> +
> + double bandwidthLiabilityInput =
> + successfulChkFetchBytesReceivedAverage.currentValue() *
> numCHKRequests +
> + successfulSskFetchBytesReceivedAverage.currentValue() *
> numSSKRequests +
> + successfulChkInsertBytesReceivedAverage.currentValue()
> * numCHKInserts +
> + successfulSskInsertBytesReceivedAverage.currentValue()
> * numSSKInserts;
> + double bandwidthAvailableInput =
> + node.getInputBandwidthLimit() * 90; // 90 seconds at
> full power
> + bandwidthAvailableInput *=
NodeStats.FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS;
> + if(bandwidthLiabilityInput > bandwidthAvailableInput) {
> + pInstantRejectIncoming.report(1.0);
> + rejected("Input bandwidth liability", isLocal);
> + return "Input bandwidth liability
> ("+bandwidthLiabilityInput+"
> "+bandwidthAvailableInput+")";
> + }
> +
> +
> + // Do we have the bandwidth?
> + double expected = this.getThrottle(isLocal, isInsert, isSSK,
true).currentValue();
> + int expectedSent = (int)Math.max(expected, 0);
> + if(logMINOR)
> + Logger.minor(this, "Expected sent bytes:
> "+expectedSent);
> + if(!requestOutputThrottle.instantGrab(expectedSent)) {
> + pInstantRejectIncoming.report(1.0);
> + rejected("Insufficient output bandwidth", isLocal);
> + return "Insufficient output bandwidth";
> + }
> + expected = this.getThrottle(isLocal, isInsert, isSSK,
false).currentValue();
> + int expectedReceived = (int)Math.max(expected, 0);
> + if(logMINOR)
> + Logger.minor(this, "Expected received bytes:
> "+expectedSent);
> + if(!requestInputThrottle.instantGrab(expectedReceived)) {
> + requestOutputThrottle.recycle(expectedSent);
> + pInstantRejectIncoming.report(1.0);
> + rejected("Insufficient input bandwidth", isLocal);
> + return "Insufficient input bandwidth";
> + }
> +
> + Runtime r = Runtime.getRuntime();
> + long maxHeapMemory = r.maxMemory();
> +
> + /* There are some JVMs (for example libgcj 4.1.1) whose
Runtime.maxMemory() does not work. */
> + if(maxHeapMemory < Long.MAX_VALUE) { // would mean unlimited
> + long totalHeapMemory = r.totalMemory();
> + long freeHeapMemory = r.freeMemory();
> +
> + freeHeapMemory = maxHeapMemory - (totalHeapMemory -
> freeHeapMemory);
> +
> + if(freeHeapMemory < freeHeapBytesThreshold) {
> + pInstantRejectIncoming.report(1.0);
> + rejected("<freeHeapBytesThreshold", isLocal);
> + return "<freeHeapBytesThreshold
> ("+SizeUtil.formatSize(freeHeapMemory,
false)+" of "+SizeUtil.formatSize(maxHeapMemory, false)+')';
> + }
> + double percentFreeHeapMemoryOfMax = ((double) freeHeapMemory) /
> ((double)
maxHeapMemory);
> + double freeHeapPercentThresholdDouble = ((double)
freeHeapPercentThreshold) / ((double) 100);
> + if(percentFreeHeapMemoryOfMax < freeHeapPercentThresholdDouble)
> {
> + pInstantRejectIncoming.report(1.0);
> + DecimalFormat fix3p1pct = new DecimalFormat("##0.0%");
> + rejected("<freeHeapPercentThreshold", isLocal);
> + return "<freeHeapPercentThreshold
> ("+SizeUtil.formatSize(freeHeapMemory,
false)+" of "+SizeUtil.formatSize(maxHeapMemory, false)+"
("+fix3p1pct.format(percentFreeHeapMemoryOfMax)+"))";
> + }
> + }
> +
> + if(source != null) {
> + long queuedBytes = source.getMessageQueueLengthBytes();
> + if(queuedBytes > MAX_PEER_QUEUE_BYTES)
> + return "Too many message bytes queued for peer";
> + if(queuedBytes /
> (source.getThrottle().getBandwidth()+1.0) >
MAX_PEER_QUEUE_TIME)
> + return "Peer's queue will take too long to
> transfer";
> + }
> +
> + synchronized(this) {
> + if(logMINOR) Logger.minor(this, "Accepting request?");
> + lastAcceptedRequest = now;
> + }
> +
> + pInstantRejectIncoming.report(0.0);
> +
> + // Accept
> + return null;
> + }
> +
> + private void rejected(String reason, boolean isLocal) {
> + if(!isLocal) preemptiveRejectReasons.inc(reason);
> + else this.localPreemptiveRejectReasons.inc(reason);
> + }
> +
> + private RunningAverage getThrottle(boolean isLocal, boolean isInsert,
boolean isSSK, boolean isSent) {
> + if(isLocal) {
> + if(isInsert) {
> + if(isSSK) {
> + return isSent ?
> this.localSskInsertBytesSentAverage :
this.localSskInsertBytesReceivedAverage;
> + } else {
> + return isSent ?
> this.localChkInsertBytesSentAverage :
this.localChkInsertBytesReceivedAverage;
> + }
> + } else {
> + if(isSSK) {
> + return isSent ?
> this.localSskFetchBytesSentAverage :
this.localSskFetchBytesReceivedAverage;
> + } else {
> + return isSent ?
> this.localChkFetchBytesSentAverage :
this.localChkFetchBytesReceivedAverage;
> + }
> + }
> + } else {
> + if(isInsert) {
> + if(isSSK) {
> + return isSent ?
> this.remoteSskInsertBytesSentAverage :
this.remoteSskInsertBytesReceivedAverage;
> + } else {
> + return isSent ?
> this.remoteChkInsertBytesSentAverage :
this.remoteChkInsertBytesReceivedAverage;
> + }
> + } else {
> + if(isSSK) {
> + return isSent ?
> this.remoteSskFetchBytesSentAverage :
this.remoteSskFetchBytesReceivedAverage;
> + } else {
> + return isSent ?
> this.remoteChkFetchBytesSentAverage :
this.remoteChkFetchBytesReceivedAverage;
> + }
> + }
> + }
> + }
> +
> + private void dumpByteCostAverages() {
> + Logger.minor(this, "Byte cost averages: REMOTE:"+
> + " CHK insert
> "+remoteChkInsertBytesSentAverage.currentValue()+ '/'
+remoteChkInsertBytesReceivedAverage.currentValue()+
> + " SSK insert
> "+remoteSskInsertBytesSentAverage.currentValue()+ '/'
+remoteSskInsertBytesReceivedAverage.currentValue()+
> + " CHK fetch
> "+remoteChkFetchBytesSentAverage.currentValue()+ '/'
+remoteChkFetchBytesReceivedAverage.currentValue()+
> + " SSK fetch
> "+remoteSskFetchBytesSentAverage.currentValue()+ '/'
+remoteSskFetchBytesReceivedAverage.currentValue());
> + Logger.minor(this, "Byte cost averages: LOCAL:"+
> + " CHK insert
> "+localChkInsertBytesSentAverage.currentValue()+ '/'
+localChkInsertBytesReceivedAverage.currentValue()+
> + " SSK insert
> "+localSskInsertBytesSentAverage.currentValue()+ '/'
+localSskInsertBytesReceivedAverage.currentValue()+
> + " CHK fetch
> "+localChkFetchBytesSentAverage.currentValue()+ '/'
+localChkFetchBytesReceivedAverage.currentValue()+
> + " SSK fetch
> "+localSskFetchBytesSentAverage.currentValue()+ '/'
+localSskFetchBytesReceivedAverage.currentValue());
> + Logger.minor(this, "Byte cost averages: SUCCESSFUL:"+
> + " CHK insert
> "+successfulChkInsertBytesSentAverage.currentValue()+ '/'
+successfulChkInsertBytesReceivedAverage.currentValue()+
> + " SSK insert
> "+successfulSskInsertBytesSentAverage.currentValue()+ '/'
+successfulSskInsertBytesReceivedAverage.currentValue()+
> + " CHK fetch
> "+successfulChkFetchBytesSentAverage.currentValue()+ '/'
+successfulChkFetchBytesReceivedAverage.currentValue()+
> + " SSK fetch
> "+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue());
> +
> + }
> +
> + public double getBwlimitDelayTime() {
> + return throttledPacketSendAverage.currentValue();
> + }
> +
> + public double getNodeAveragePingTime() {
> + return nodePinger.averagePingTime();
> + }
> +
> + public int getNetworkSizeEstimate(long timestamp) {
> + return node.lm.getNetworkSizeEstimate( timestamp );
> + }
> +
> + public Object[] getKnownLocations(long timestamp) {
> + return node.lm.getKnownLocations( timestamp );
> + }
> +
> + public double pRejectIncomingInstantly() {
> + return pInstantRejectIncoming.currentValue();
> + }
> +
> + /**
> + * Update peerManagerUserAlertStats if the timer has expired.
> + * Only called from PacketSender so doesn't need sync.
> + */
> + public void maybeUpdatePeerManagerUserAlertStats(long now) {
> + if(now > nextPeerManagerUserAlertStatsUpdateTime) {
> + if(getBwlimitDelayTime() >
> MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD) {
> + if(firstBwlimitDelayTimeThresholdBreak == 0) {
> + firstBwlimitDelayTimeThresholdBreak =
> now;
> + }
> + } else {
> + firstBwlimitDelayTimeThresholdBreak = 0;
> + }
> + if((firstBwlimitDelayTimeThresholdBreak != 0) && ((now
> -
firstBwlimitDelayTimeThresholdBreak) >= MAX_BWLIMIT_DELAY_TIME_ALERT_DELAY))
{
> + bwlimitDelayAlertRelevant = true;
> + } else {
> + bwlimitDelayAlertRelevant = false;
> + }
> + if(getNodeAveragePingTime() >
MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD) {
> + if(firstNodeAveragePingTimeThresholdBreak == 0)
> {
> + firstNodeAveragePingTimeThresholdBreak
> = now;
> + }
> + } else {
> + firstNodeAveragePingTimeThresholdBreak = 0;
> + }
> + if((firstNodeAveragePingTimeThresholdBreak != 0) &&
> ((now -
firstNodeAveragePingTimeThresholdBreak) >=
MAX_NODE_AVERAGE_PING_TIME_ALERT_DELAY)) {
> + nodeAveragePingAlertRelevant = true;
> + } else {
> + nodeAveragePingAlertRelevant = false;
> + }
> + if(logMINOR && Logger.shouldLog(Logger.DEBUG, this))
Logger.debug(this, "mUPMUAS: "+now+": "+getBwlimitDelayTime()+"
>? "+MAX_BWLIMIT_DELAY_TIME_ALERT_THRESHOLD+"
since "+firstBwlimitDelayTimeThresholdBreak+"
("+bwlimitDelayAlertRelevant+") "+getNodeAveragePingTime()+"
>? "+MAX_NODE_AVERAGE_PING_TIME_ALERT_THRESHOLD+"
since "+firstNodeAveragePingTimeThresholdBreak+"
("+nodeAveragePingAlertRelevant+ ')');
> + nextPeerManagerUserAlertStatsUpdateTime = now +
peerManagerUserAlertStatsUpdateInterval;
> + }
> + }
> +
> + public SimpleFieldSet persistThrottlesToFieldSet() {
> + SimpleFieldSet fs = new SimpleFieldSet(true);
> + fs.put("RemoteChkFetchBytesSentAverage",
remoteChkFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("RemoteSskFetchBytesSentAverage",
remoteSskFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("RemoteChkInsertBytesSentAverage",
remoteChkInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("RemoteSskInsertBytesSentAverage",
remoteSskInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("RemoteChkFetchBytesReceivedAverage",
remoteChkFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("RemoteSskFetchBytesReceivedAverage",
remoteSskFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("RemoteChkInsertBytesReceivedAverage",
remoteChkInsertBytesReceivedAverage.exportFieldSet(true));
> + fs.put("RemoteSskInsertBytesReceivedAverage",
remoteSskInsertBytesReceivedAverage.exportFieldSet(true));
> + fs.put("LocalChkFetchBytesSentAverage",
localChkFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("LocalSskFetchBytesSentAverage",
localSskFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("LocalChkInsertBytesSentAverage",
localChkInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("LocalSskInsertBytesSentAverage",
localSskInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("LocalChkFetchBytesReceivedAverage",
localChkFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("LocalSskFetchBytesReceivedAverage",
localSskFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("LocalChkInsertBytesReceivedAverage",
localChkInsertBytesReceivedAverage.exportFieldSet(true));
> + fs.put("LocalSskInsertBytesReceivedAverage",
localSskInsertBytesReceivedAverage.exportFieldSet(true));
> + fs.put("SuccessfulChkFetchBytesSentAverage",
successfulChkFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("SuccessfulSskFetchBytesSentAverage",
successfulSskFetchBytesSentAverage.exportFieldSet(true));
> + fs.put("SuccessfulChkInsertBytesSentAverage",
successfulChkInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("SuccessfulSskInsertBytesSentAverage",
successfulSskInsertBytesSentAverage.exportFieldSet(true));
> + fs.put("SuccessfulChkFetchBytesReceivedAverage",
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("SuccessfulSskFetchBytesReceivedAverage",
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
> + fs.put("SuccessfulChkInsertBytesReceivedAverage",
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
> + fs.put("SuccessfulSskInsertBytesReceivedAverage",
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
> + return fs;
> + }
> +
> + /**
> + * Update the node-wide bandwidth I/O stats if the timer has expired
> + */
> + public void maybeUpdateNodeIOStats(long now) {
> + if(now > nextNodeIOStatsUpdateTime) {
> + long[] io_stats = IOStatisticCollector.getTotalIO();
> + long outdiff;
> + long indiff;
> + synchronized(ioStatSync) {
> + previous_output_stat = last_output_stat;
> + previous_input_stat = last_input_stat;
> + previous_io_stat_time = last_io_stat_time;
> + last_output_stat = io_stats[ 0 ];
> + last_input_stat = io_stats[ 1 ];
> + last_io_stat_time = now;
> + outdiff = last_output_stat -
> previous_output_stat;
> + indiff = last_input_stat - previous_input_stat;
> + }
> + if(logMINOR)
> + Logger.minor(this, "Last 2 seconds: input:
> "+indiff+"
output: "+outdiff);
> + nextNodeIOStatsUpdateTime = now +
> nodeIOStatsUpdateInterval;
> + }
> + }
> +
> + public long[] getNodeIOStats() {
> + long[] result = new long[6];
> + synchronized(ioStatSync) {
> + result[ 0 ] = previous_output_stat;
> + result[ 1 ] = previous_input_stat;
> + result[ 2 ] = previous_io_stat_time;
> + result[ 3 ] = last_output_stat;
> + result[ 4 ] = last_input_stat;
> + result[ 5 ] = last_io_stat_time;
> + }
> + return result;
> + }
> +
> + public void waitUntilNotOverloaded(boolean isInsert) {
> + while(threadLimit < getActiveThreadCount()){
> + try{
> + Thread.sleep(5000);
> + } catch (InterruptedException e) {}
> + }
> + }
> +
> + public int getActiveThreadCount() {
> + return rootThreadGroup.activeCount() -
> node.executor.waitingThreads();
> + }
> +
> + public int getThreadLimit() {
> + return threadLimit;
> + }
> +
> + public SimpleFieldSet exportVolatileFieldSet() {
> + SimpleFieldSet fs = new SimpleFieldSet(true);
> + long now = System.currentTimeMillis();
> + fs.put("isUsingWrapper", node.isUsingWrapper());
> + long nodeUptimeSeconds = 0;
> + synchronized(this) {
> + fs.put("startupTime", node.startupTime);
> + nodeUptimeSeconds = (now - node.startupTime) / 1000;
> + fs.put("uptimeSeconds", nodeUptimeSeconds);
> + }
> + fs.put("averagePingTime", getNodeAveragePingTime());
> + fs.put("bwlimitDelayTime", getBwlimitDelayTime());
> + fs.put("networkSizeEstimateSession",
> getNetworkSizeEstimate(-1));
> + int networkSizeEstimate24hourRecent =
> getNetworkSizeEstimate(now -
(24*60*60*1000)); // 24 hours
> + fs.put("networkSizeEstimate24hourRecent",
networkSizeEstimate24hourRecent);
> + int networkSizeEstimate48hourRecent =
> getNetworkSizeEstimate(now -
(48*60*60*1000)); // 48 hours
> + fs.put("networkSizeEstimate48hourRecent",
networkSizeEstimate48hourRecent);
> + fs.put("routingMissDistance",
> routingMissDistance.currentValue());
> + fs.put("backedOffPercent", backedOffPercent.currentValue());
> + fs.put("pInstantReject", pRejectIncomingInstantly());
> + fs.put("unclaimedFIFOSize", node.usm.getUnclaimedFIFOSize());
> +
> + /* gather connection statistics */
> + DarknetPeerNodeStatus[] peerNodeStatuses =
peers.getDarknetPeerNodeStatuses();
> + Arrays.sort(peerNodeStatuses, new Comparator() {
> + public int compare(Object first, Object second) {
> + DarknetPeerNodeStatus firstNode =
> (DarknetPeerNodeStatus) first;
> + DarknetPeerNodeStatus secondNode =
> (DarknetPeerNodeStatus) second;
> + int statusDifference =
> firstNode.getStatusValue() -
secondNode.getStatusValue();
> + if (statusDifference != 0) {
> + return statusDifference;
> + }
> + return
> firstNode.getName().compareToIgnoreCase(secondNode.getName());
> + }
> + });
> +
> + int numberOfConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_CONNECTED);
> + int numberOfRoutingBackedOff =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF);
> + int numberOfTooNew =
> PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_NEW);
> + int numberOfTooOld =
> PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_TOO_OLD);
> + int numberOfDisconnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISCONNECTED);
> + int numberOfNeverConnected =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_NEVER_CONNECTED);
> + int numberOfDisabled =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_DISABLED);
> + int numberOfBursting =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_BURSTING);
> + int numberOfListening =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTENING);
> + int numberOfListenOnly =
PeerNodeStatus.getPeerStatusCount(peerNodeStatuses,
PeerManager.PEER_NODE_STATUS_LISTEN_ONLY);
> +
> + int numberOfSimpleConnected = numberOfConnected +
numberOfRoutingBackedOff;
> + int numberOfNotConnected = numberOfTooNew + numberOfTooOld +
numberOfDisconnected + numberOfNeverConnected + numberOfDisabled +
numberOfBursting + numberOfListening + numberOfListenOnly;
> +
> + fs.put("numberOfConnected", numberOfConnected);
> + fs.put("numberOfRoutingBackedOff", numberOfRoutingBackedOff);
> + fs.put("numberOfTooNew", numberOfTooNew);
> + fs.put("numberOfTooOld", numberOfTooOld);
> + fs.put("numberOfDisconnected", numberOfDisconnected);
> + fs.put("numberOfNeverConnected", numberOfNeverConnected);
> + fs.put("numberOfDisabled", numberOfDisabled);
> + fs.put("numberOfBursting", numberOfBursting);
> + fs.put("numberOfListening", numberOfListening);
> + fs.put("numberOfListenOnly", numberOfListenOnly);
> +
> + fs.put("numberOfSimpleConnected", numberOfSimpleConnected);
> + fs.put("numberOfNotConnected", numberOfNotConnected);
> +
> + fs.put("numberOfInsertSenders", node.getNumInsertSenders());
> + fs.put("numberOfRequestSenders", node.getNumRequestSenders());
> + fs.put("numberOfTransferringRequestSenders",
node.getNumTransferringRequestSenders());
> + fs.put("numberOfARKFetchers", node.getNumARKFetchers());
> +
> + long[] total = IOStatisticCollector.getTotalIO();
> + long total_output_rate = (total[0]) / nodeUptimeSeconds;
> + long total_input_rate = (total[1]) / nodeUptimeSeconds;
> + long totalPayloadOutput = node.getTotalPayloadSent();
> + long total_payload_output_rate = totalPayloadOutput /
> nodeUptimeSeconds;
> + int total_payload_output_percent = (int) (100 *
> totalPayloadOutput /
total[0]);
> + fs.put("totalOutputBytes", total[0]);
> + fs.put("totalOutputRate", total_output_rate);
> + fs.put("totalPayloadOutputBytes", totalPayloadOutput);
> + fs.put("totalPayloadOutputRate", total_payload_output_rate);
> + fs.put("totalPayloadOutputPercent",
> total_payload_output_percent);
> + fs.put("totalInputBytes", total[1]);
> + fs.put("totalInputRate", total_input_rate);
> +
> + long[] rate = getNodeIOStats();
> + long deltaMS = (rate[5] - rate[2]);
> + double recent_output_rate = 1000.0 * (rate[3] - rate[0]) /
> deltaMS;
> + double recent_input_rate = 1000.0 * (rate[4] - rate[1]) /
> deltaMS;
> + fs.put("recentOutputRate", recent_output_rate);
> + fs.put("recentInputRate", recent_input_rate);
> +
> + String [] routingBackoffReasons =
peers.getPeerNodeRoutingBackoffReasons();
> + if(routingBackoffReasons.length != 0) {
> + for(int i=0;i<routingBackoffReasons.length;i++) {
> + fs.put("numberWithRoutingBackoffReasons." +
> routingBackoffReasons[i],
peers.getPeerNodeRoutingBackoffReasonSize(routingBackoffReasons[i]));
> + }
> + }
> +
> + double swaps = (double)node.getSwaps();
> + double noSwaps = (double)node.getNoSwaps();
> + double numberOfRemotePeerLocationsSeenInSwaps =
(double)node.getNumberOfRemotePeerLocationsSeenInSwaps();
> + fs.putSingle("numberOfRemotePeerLocationsSeenInSwaps",
Double.toString(numberOfRemotePeerLocationsSeenInSwaps));
> + double avgConnectedPeersPerNode = 0.0;
> + if ((numberOfRemotePeerLocationsSeenInSwaps > 0.0) && ((swaps >
> 0.0) ||
(noSwaps > 0.0))) {
> + avgConnectedPeersPerNode =
numberOfRemotePeerLocationsSeenInSwaps/(swaps+noSwaps);
> + }
> + fs.putSingle("avgConnectedPeersPerNode",
Double.toString(avgConnectedPeersPerNode));
> +
> + int startedSwaps = node.getStartedSwaps();
> + int swapsRejectedAlreadyLocked =
> node.getSwapsRejectedAlreadyLocked();
> + int swapsRejectedNowhereToGo =
> node.getSwapsRejectedNowhereToGo();
> + int swapsRejectedRateLimit = node.getSwapsRejectedRateLimit();
> + int swapsRejectedLoop = node.getSwapsRejectedLoop();
> + int swapsRejectedRecognizedID =
> node.getSwapsRejectedRecognizedID();
> + double locationChangePerSession =
> node.getLocationChangeSession();
> + double locationChangePerSwap = 0.0;
> + double locationChangePerMinute = 0.0;
> + double swapsPerMinute = 0.0;
> + double noSwapsPerMinute = 0.0;
> + double swapsPerNoSwaps = 0.0;
> + if (swaps > 0) {
> + locationChangePerSwap = locationChangePerSession/swaps;
> + }
> + if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> + locationChangePerMinute =
> locationChangePerSession/(double)
(nodeUptimeSeconds/60.0);
> + }
> + if ((swaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> + swapsPerMinute = swaps/(double)(nodeUptimeSeconds/60.0);
> + }
> + if ((noSwaps > 0.0) && (nodeUptimeSeconds >= 60)) {
> + noSwapsPerMinute =
> noSwaps/(double)(nodeUptimeSeconds/60.0);
> + }
> + if ((swaps > 0.0) && (noSwaps > 0.0)) {
> + swapsPerNoSwaps = swaps/noSwaps;
> + }
> + fs.put("locationChangePerSession", locationChangePerSession);
> + fs.put("locationChangePerSwap", locationChangePerSwap);
> + fs.put("locationChangePerMinute", locationChangePerMinute);
> + fs.put("swapsPerMinute", swapsPerMinute);
> + fs.put("noSwapsPerMinute", noSwapsPerMinute);
> + fs.put("swapsPerNoSwaps", swapsPerNoSwaps);
> + fs.put("swaps", swaps);
> + fs.put("noSwaps", noSwaps);
> + fs.put("startedSwaps", startedSwaps);
> + fs.put("swapsRejectedAlreadyLocked",
> swapsRejectedAlreadyLocked);
> + fs.put("swapsRejectedNowhereToGo", swapsRejectedNowhereToGo);
> + fs.put("swapsRejectedRateLimit", swapsRejectedRateLimit);
> + fs.put("swapsRejectedLoop", swapsRejectedLoop);
> + fs.put("swapsRejectedRecognizedID", swapsRejectedRecognizedID);
> + long fix32kb = 32 * 1024;
> + long cachedKeys = node.getChkDatacache().keyCount();
> + long cachedSize = cachedKeys * fix32kb;
> + long storeKeys = node.getChkDatastore().keyCount();
> + long storeSize = storeKeys * fix32kb;
> + long overallKeys = cachedKeys + storeKeys;
> + long overallSize = cachedSize + storeSize;
> +
> + long maxOverallKeys = node.getMaxTotalKeys();
> + long maxOverallSize = maxOverallKeys * fix32kb;
> +
> + double percentOverallKeysOfMax = (double)
(overallKeys*100)/(double)maxOverallKeys;
> +
> + long cachedStoreHits = node.getChkDatacache().hits();
> + long cachedStoreMisses = node.getChkDatacache().misses();
> + long cacheAccesses = cachedStoreHits + cachedStoreMisses;
> + double percentCachedStoreHitsOfAccesses =
> (double)(cachedStoreHits*100) /
(double)cacheAccesses;
> + long storeHits = node.getChkDatastore().hits();
> + long storeMisses = node.getChkDatastore().misses();
> + long storeAccesses = storeHits + storeMisses;
> + double percentStoreHitsOfAccesses = (double)(storeHits*100) /
(double)storeAccesses;
> + long overallAccesses = storeAccesses + cacheAccesses;
> + double avgStoreAccessRate =
(double)overallAccesses/(double)nodeUptimeSeconds;
> +
> + fs.put("cachedKeys", cachedKeys);
> + fs.put("cachedSize", cachedSize);
> + fs.put("storeKeys", storeKeys);
> + fs.put("storeSize", storeSize);
> + fs.put("overallKeys", overallKeys);
> + fs.put("overallSize", overallSize);
> + fs.put("maxOverallKeys", maxOverallKeys);
> + fs.put("maxOverallSize", maxOverallSize);
> + fs.put("percentOverallKeysOfMax", percentOverallKeysOfMax);
> + fs.put("cachedStoreHits", cachedStoreHits);
> + fs.put("cachedStoreMisses", cachedStoreMisses);
> + fs.put("cacheAccesses", cacheAccesses);
> + fs.put("percentCachedStoreHitsOfAccesses",
percentCachedStoreHitsOfAccesses);
> + fs.put("storeHits", storeHits);
> + fs.put("storeMisses", storeMisses);
> + fs.put("storeAccesses", storeAccesses);
> + fs.put("percentStoreHitsOfAccesses",
> percentStoreHitsOfAccesses);
> + fs.put("overallAccesses", overallAccesses);
> + fs.put("avgStoreAccessRate", avgStoreAccessRate);
> +
> + Runtime rt = Runtime.getRuntime();
> + float freeMemory = (float) rt.freeMemory();
> + float totalMemory = (float) rt.totalMemory();
> + float maxMemory = (float) rt.maxMemory();
> +
> + long usedJavaMem = (long)(totalMemory - freeMemory);
> + long allocatedJavaMem = (long)totalMemory;
> + long maxJavaMem = (long)maxMemory;
> + int availableCpus = rt.availableProcessors();
> +
> + fs.put("freeJavaMemory", (long)freeMemory);
> + fs.put("usedJavaMemory", usedJavaMem);
> + fs.put("allocatedJavaMemory", allocatedJavaMem);
> + fs.put("maximumJavaMemory", maxJavaMem);
> + fs.put("availableCPUs", availableCpus);
> + fs.put("runningThreadCount", getActiveThreadCount());
> +
> + return fs;
> + }
> +
> + public void setOutputLimit(int obwLimit) {
> + requestOutputThrottle.changeNanosAndBucketSize((int)
((1000L*1000L*1000L) / (obwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)),
Math.max(obwLimit*60, 32768*20));
> + if(node.inputLimitDefault) {
> + setInputLimit(obwLimit * 4);
> + }
> + }
> +
> + public void setInputLimit(int ibwLimit) {
> +
> requestInputThrottle.changeNanosAndBucketSize((int)((1000L*1000L*1000L) /
(ibwLimit * FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), Math.max(ibwLimit*60,
32768*20));
> + }
> +
> + public boolean isTestnetEnabled() {
> + return node.isTestnetEnabled();
> + }
> +
> + public boolean getRejectReasonsTable(HTMLNode table) {
> + return preemptiveRejectReasons.toTableRows(table) > 0;
> + }
> +
> + public boolean getLocalRejectReasonsTable(HTMLNode table) {
> + return localPreemptiveRejectReasons.toTableRows(table) > 0;
> + }
> +
> + public void requestCompleted(boolean succeeded, boolean isRemote,
> boolean
isSSK) {
> + globalFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> + if(isSSK)
> + sskFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> + else
> + chkFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> + if(isRemote)
> + remoteFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> + else
> + localFetchPSuccess.report(succeeded ? 1.0 : 0.0);
> + }
> +
> + private final DecimalFormat fix3p3pct = new DecimalFormat("##0.000%");
> + private final NumberFormat thousendPoint = NumberFormat.getInstance();
> +
> + public void fillSuccessRateBox(HTMLNode parent) {
> + HTMLNode list = parent.addChild("table", "border", "0");
> + final TrivialRunningAverage[] averages = new
> TrivialRunningAverage[] {
> + globalFetchPSuccess,
> + chkFetchPSuccess,
> + sskFetchPSuccess,
> + localFetchPSuccess,
> + remoteFetchPSuccess
> + };
> + final String[] names = new String[] {
> + // FIXME l10n, but atm this only shows up in
> advanced mode
> + "All requests",
> + "CHKs",
> + "SSKs",
> + "Local requests",
> + "Remote requests"
> + };
> + HTMLNode row = list.addChild("tr");
> + row.addChild("th", "Group");
> + row.addChild("th", "P(success)");
> + row.addChild("th", "Count");
> +
> + for(int i=0;i<averages.length;i++) {
> + row = list.addChild("tr");
> + row.addChild("td", names[i]);
> + if (averages[i].countReports()==0) {
> + row.addChild("td", "-");
> + row.addChild("td", "0");
> + } else {
> + row.addChild("td",
> fix3p3pct.format(averages[i].currentValue()));
> + row.addChild("td",
> thousendPoint.format(averages[i].countReports()));
> + }
> + }
> + }
> +
> +}
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20071224/e6b1ab00/attachment.pgp>