Repository: reef Updated Branches: refs/heads/master af63bf639 -> 0b8da4cfc
http://git-wip-us.apache.org/repos/asf/reef/blob/0b8da4cf/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricBase.cs b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricBase.cs new file mode 100644 index 0000000..3453232 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricBase.cs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Metrics.Api; + +namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer +{ + /// <summary> + /// Base Mutable metrics class. Can be instanitated only from derived classes. + /// </summary> + internal class MutableMetricBase : IExtendedMutableMetric + { + private IObserver<SnapshotRequest> _observer; + private volatile bool _changed; + private IDisposable _unSubscriber; + + /// <summary> + /// Protected Constructor + /// </summary> + /// <param name="info">Meta-data info. of the metric.</param> + protected MutableMetricBase(IMetricsInfo info) + { + Info = info; + _changed = true; + } + + /// <summary> + /// Registers the observer with the base class. This method will typically be used by the + /// metrics like Counter and Gauges in their constructor to register observer. + /// </summary> + /// <param name="observer">Observer to be registered.</param> + protected void RegisterSnapshotRequestObserver(IObserver<SnapshotRequest> observer) + { + if (_observer != null) + { + throw new MetricsException("Another observer already registered with the metric"); + } + _observer = observer; + } + + /// <summary> + /// Meta-data of the metric. + /// </summary> + public IMetricsInfo Info { get; private set; } + + /// <summary> + /// Sets the changed flag. Called in derived classes when metric values are changed. + /// </summary> + public void SetChanged() + { + _changed = true; + } + + /// <summary> + /// Clears the changed flag. Called by snapshot operations after recording latest values. + /// </summary> + public void ClearChanged() + { + _changed = false; + } + + /// <summary> + /// True if metric value changed after taking a snapshot, false otherwise. + /// </summary> + public bool Changed + { + get { return _changed; } + } + + /// <summary> + /// Simply calls the OnNext() of registered observer. + /// </summary> + /// <param name="value">Snapshot request instance</param> + public void OnNext(SnapshotRequest value) + { + _observer.OnNext(value); + } + + /// <summary> + /// Simply calls the OnError() of registered observer. + /// </summary> + /// <param name="error">Exception message</param> + public void OnError(Exception error) + { + _observer.OnError(error); + } + + /// <summary> + /// Simply calls OnCompleted() of registered observer. + /// </summary> + public void OnCompleted() + { + _observer.OnCompleted(); + } + + /// <summary> + /// Used to subscribe itself with the provider, typically an <see cref="IMetricsSource"/>. + /// </summary> + /// <param name="requestor">Provider.</param> + public void Subscribe(IObservable<SnapshotRequest> requestor) + { + if (requestor != null) + { + _unSubscriber = requestor.Subscribe(this); + } + } + + /// <summary> + /// Used to un-subscribe itself with the provider, typically an <see cref="IMetricsSource"/>. + /// Useful once metrics has outlived it's utility and there is no need to transmit it. + /// </summary> + public void UnSubscribe() + { + _unSubscriber.Dispose(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/0b8da4cf/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricContainer.cs b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricContainer.cs new file mode 100644 index 0000000..fad5f7a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableMetricContainer.cs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Common.Metrics.Api; + +namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer +{ + /// <summary> + /// A container containing collection of particular kind of mutable metrics + /// like counters, gauge etc. + /// </summary> + /// <typeparam name="T">Type of mutable metric.</typeparam> + internal sealed class MutableMetricContainer<T> : IMetricContainer<T> where T : IMutableMetric + { + private readonly Dictionary<string, T> _metricsDictionary = new Dictionary<string, T>(); + private readonly Func<string, string, T> _createMetricFromNameDesc; + private readonly IObservable<SnapshotRequest> _provider; + private readonly object _lock = new object(); + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="metricFromNameDesc">A function that creates mutable metric from name and desc.</param> + /// <param name="provider">Observale to which new metric should subscribe to.</param> + internal MutableMetricContainer(Func<string, string, T> metricFromNameDesc, + IObservable<SnapshotRequest> provider) + { + _createMetricFromNameDesc = metricFromNameDesc; + _provider = provider; + } + + /// <summary> + /// Creates a metric from name and description. Throws exception if + /// metric with given name already exists. + /// </summary> + /// <param name="name">Name of the metric.</param> + /// <param name="desc">Description of the metric.</param> + /// <returns>Newly created metric.</returns> + public T Create(string name, string desc) + { + lock (_lock) + { + if (!_metricsDictionary.ContainsKey(name)) + { + _metricsDictionary[name] = _createMetricFromNameDesc(name, desc); + _metricsDictionary[name].Subscribe(_provider); + return _metricsDictionary[name]; + } + throw new MetricsException(string.Format("Metric with name {0} already exists", name)); + } + } + + /// <summary> + /// Deletes a metric with given name and description. + /// </summary> + /// <param name="name">Name of the metric.</param> + /// <returns>True if the metric is deleted, false if it does not exist</returns> + public bool Delete(string name) + { + T entry; + lock (_lock) + { + if (!_metricsDictionary.ContainsKey(name)) + { + return false; + } + entry = _metricsDictionary[name]; + _metricsDictionary.Remove(name); + } + entry.OnCompleted(); + return true; + } + + /// <summary> + /// String based indexer. Gets metric by name. Throws exception + /// if it does not exist. + /// </summary> + /// <param name="name">Name of the metric,</param> + /// <returns>Existing metric with given name.</returns> + public T this[string name] + { + get + { + lock (_lock) + { + if (!_metricsDictionary.ContainsKey(name)) + { + throw new MetricsException(string.Format("Metric with name {0} does not exist", name)); + } + return _metricsDictionary[name]; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/0b8da4cf/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableRate.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableRate.cs b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableRate.cs new file mode 100644 index 0000000..ee87176 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableRate.cs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Common.Metrics.Api; + +namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer +{ + /// <summary> + /// A convenient metric for throughput measurement. The ValueName field in + /// <see cref="MutableStat"/> is set to "Time". + /// </summary> + internal sealed class MutableRate : MutableStat, IRate + { + /// <summary> + /// Constructor. + /// </summary> + /// <param name="info">Meta-data of the metric.</param> + /// <param name="extended">If false, outputs only current mean, otherwise outputs + /// everything(mean, variance, min, max overall and of current interval.</param> + public MutableRate(IMetricsInfo info, bool extended = true) + : base(info, "Time", extended) + { + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="name">Name of the metric.</param> + /// <param name="extended">If false, outputs only current mean, otherwise outputs + /// everything(mean, variance, min, max overall and of current interval.</param> + public MutableRate(string name, bool extended = true) + : base(new MetricsInfoImpl(name, name), "Time", extended) + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/0b8da4cf/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableStat.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableStat.cs b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableStat.cs new file mode 100644 index 0000000..4c6d43a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/MutableStat.cs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Reactive; +using Org.Apache.REEF.Common.Metrics.Api; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer +{ + /// <summary> + /// Implementation for the stat metric. Samples will be + /// added by <see cref="Sample"/> function call. + /// </summary> + internal class MutableStat : MutableMetricBase, IStat + { + private readonly IMetricsInfo _numSamplesInfo; + private readonly IMetricsInfo _runningMeanInfo; + private readonly IMetricsInfo _currentMeanInfo; + private readonly IMetricsInfo _runningMinInfo; + private readonly IMetricsInfo _runningMaxInfo; + private readonly IMetricsInfo _currentMinInfo; + private readonly IMetricsInfo _currentMaxInfo; + private readonly IMetricsInfo _runningStdInfo; + private readonly IMetricsInfo _currentStdInfo; + private readonly object _lock = new object(); + + private readonly StatsHelperClass _runningStat = new StatsHelperClass(); + private readonly StatsHelperClass _intervalStat = new StatsHelperClass(); + private readonly StatsHelperClass _prevStat = new StatsHelperClass(); + private readonly bool _showExtendedStats; + private static readonly Logger Logger = Logger.GetLogger(typeof(MutableStat)); + + /// <summary> + /// Value this stat represents (Time, Latency etc.). Used to generate description + /// for average, variance metrics etc. For example: "Average Time for ...." or + /// "Average Latency for ........". + /// </summary> + protected readonly string ValueName; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="info">Meta data of the stat.</param> + /// <param name="valueName">Value (e.g. time, latency) the stat represents.</param> + /// <param name="extendedStats"></Whether to show only mean (false) or mean, + /// stdev, min, max etc.</param> + public MutableStat(IMetricsInfo info, string valueName, bool extendedStats = true) + : base(info) + { + ValueName = valueName; + _showExtendedStats = extendedStats; + + string name = info.Name + "-Num"; + string desc = "Number of samples for " + info.Description; + _numSamplesInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-RunningAvg"; + desc = "Average " + valueName + " for " + info.Description; + _runningMeanInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-RunningStdev"; + desc = "Standard deviation of " + valueName + " for " + info.Description; + _runningStdInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-IntervalAvg"; + desc = "Interval Average " + valueName + " for " + info.Description; + _currentMeanInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-IntervalStdev"; + desc = "Interval Standard deviation of " + valueName + " for " + info.Description; + _currentStdInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-RunningMin"; + desc = "Min " + valueName + " for " + info.Description; + _runningMinInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-RunningMax"; + desc = "Max " + valueName + " for " + info.Description; + _runningMaxInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-IntervalMin"; + desc = "Interval Min " + valueName + " for " + info.Description; + _currentMinInfo = new MetricsInfoImpl(name, desc); + + name = info.Name + "-IntervalMax"; + desc = "Interval Max " + valueName + " for " + info.Description; + _currentMaxInfo = new MetricsInfoImpl(name, desc); + + RegisterSnapshotRequestObserver(Observer.Create<SnapshotRequest>(this.GiveSnapshot, + this.SnapshotError, + UnSubscribe)); + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="name">Name of the stat.</param> + /// <param name="valueName">Value (e.g. time, latency) the stat represents.</param> + /// <param name="extendedStats"></Whether to show only mean (false) or mean, + /// stdev, min, max etc.</param> + public MutableStat(string name, string valueName, bool extendedStats = true) + : this(new MetricsInfoImpl(name, name), valueName, extendedStats) + { + } + + /// <summary> + /// Adds a sample to the stat. + /// </summary> + /// <param name="value">Value of the sample.</param> + public void Sample(double value) + { + lock (_lock) + { + _runningStat.Add(value); + _intervalStat.Add(value); + SetChanged(); + } + } + + private void GiveSnapshot(SnapshotRequest request) + { + bool all = request.FullSnapshot; + var recordBuilder = request.Builder; + lock (_lock) + { + var lastStat = Changed ? _intervalStat : _prevStat; + if (all || Changed) + { + recordBuilder.AddCounter(_numSamplesInfo, _runningStat.NumSamples) + .AddGauge(_currentMeanInfo, lastStat.Mean); + + if (_showExtendedStats) + { + recordBuilder.AddGauge(_currentMaxInfo, _intervalStat.MinMaxModule.Max) + .AddGauge(_currentMinInfo, _intervalStat.MinMaxModule.Min) + .AddGauge(_currentStdInfo, _intervalStat.Std) + .AddGauge(_runningMaxInfo, _runningStat.MinMaxModule.Max) + .AddGauge(_runningMinInfo, _runningStat.MinMaxModule.Min) + .AddGauge(_runningMeanInfo, _runningStat.Mean) + .AddGauge(_runningStdInfo, _runningStat.Std); + } + + if (Changed) + { + if (_runningStat.NumSamples > 0) + { + _prevStat.CopyFrom(_intervalStat); + _intervalStat.Reset(); + } + ClearChanged(); + } + } + } + } + + private void SnapshotError(Exception e) + { + Logger.Log(Level.Error, "Exception happened while trying to take the snapshot"); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/0b8da4cf/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/StatsHelperClass.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/StatsHelperClass.cs b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/StatsHelperClass.cs new file mode 100644 index 0000000..c6b7065 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/metrics/MutableMetricsLayer/StatsHelperClass.cs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer +{ + /// <summary> + /// Helper class for <see cref="MutableStat"/> to keep and update + /// various stats. + /// </summary> + internal sealed class StatsHelperClass + { + private double _mean = 0; + private double _unNormalizedVariance = 0; + private long _numSamples = 0; + private readonly MinMax _minMax = new MinMax(); + + /// <summary> + /// Copies current stat values fron another instance. + /// </summary> + /// <param name="other">Instance from which to copy.</param> + internal void CopyFrom(StatsHelperClass other) + { + _numSamples = other.NumSamples; + _mean = other.Mean; + _unNormalizedVariance = other.Variance * (_numSamples - 1); + _minMax.Reset(other.MinMaxModule); + } + + /// <summary> + /// Mean of current samples. + /// </summary> + internal double Mean + { + get { return _mean; } + } + + /// <summary> + /// Total number of samples. + /// </summary> + internal long NumSamples + { + get { return _numSamples; } + } + + /// <summary> + /// Variance of the samples. + /// </summary> + internal double Variance + { + get { return _numSamples > 1 ? _unNormalizedVariance / (_numSamples - 1) : 0; } + } + + /// <summary> + /// Standard deviation of the samples. + /// </summary> + internal double Std + { + get { return Math.Sqrt(Variance); } + } + + /// <summary> + /// Instance of <see cref="MinMax"/> to get min and max values. + /// </summary> + internal MinMax MinMaxModule + { + get { return _minMax; } + } + + /// <summary> + /// Resets the instance. + /// </summary> + internal void Reset() + { + _unNormalizedVariance = _mean = _numSamples = 0; + _minMax.Reset(); + } + + /// <summary> + /// Updates all the stats with the new value. + /// Uses Welford method for numerical stability + /// </summary> + /// <param name="value">Value to add.</param> + /// <returns>Returns self.</returns> + internal StatsHelperClass Add(double value) + { + _minMax.Add(value); + _numSamples++; + + if (_numSamples == 1) + { + _mean = value; + } + else + { + double oldMean = _mean; + _mean += (value - _mean) / _numSamples; + _unNormalizedVariance += (value - oldMean) * (value - _mean); + } + return this; + } + + /// <summary> + /// Helper class for keeping min max vlaue. + /// </summary> + internal sealed class MinMax + { + const double DefaultMinValue = double.MaxValue; + const double DefaultMaxValue = double.MinValue; + + private double _min = DefaultMinValue; + private double _max = DefaultMaxValue; + + /// <summary> + /// Updates min max values with the new value. + /// </summary> + /// <param name="value">Value to add.</param> + public void Add(double value) + { + _min = Math.Min(_min, value); + _max = Math.Max(_max, value); + } + + /// <summary> + /// Resets min and max values to defaults. + /// </summary> + public void Reset() + { + _min = DefaultMinValue; + _max = DefaultMaxValue; + } + + /// <summary> + /// Resets min max values from some other instance. + /// </summary> + /// <param name="other">Instance from which to reset.</param> + public void Reset(MinMax other) + { + _min = other.Min; + _max = other.Max; + } + + /// <summary> + /// Minimum value. + /// </summary> + public double Min + { + get { return _min; } + } + + /// <summary> + /// Maximum value. + /// </summary> + public double Max + { + get { return _max; } + } + } + } +} \ No newline at end of file
