Repository: reef Updated Branches: refs/heads/master 39adc451b -> b90a3f690
[REEF-1735] Define IMetricsSink interface * Add IMetricsSink interface and default impl * Add named parameter for MetricsSinks * Add MetricsSinks to MetricsService * Modify test cases JIRA: [REEF-1735](https://issues.apache.org/jira/browse/REEF-1735) Pull request This closes #1262 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b90a3f69 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b90a3f69 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b90a3f69 Branch: refs/heads/master Commit: b90a3f690e859fa23319abe33fdbc259007fa393 Parents: 39adc45 Author: Julia Wang <[email protected]> Authored: Tue Mar 14 11:52:53 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu May 11 19:39:00 2017 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 6 ++ .../Properties/AssemblyInfo.cs | 6 ++ .../Telemetry/CounterData.cs | 77 ++++++++++++++ .../Telemetry/CounterSinkThreshold.cs | 26 +++++ .../Telemetry/CountersData.cs | 103 +++++++++++++++++++ .../Telemetry/DefaultMetricsSink.cs | 57 ++++++++++ .../Telemetry/IMetricsSink.cs | 33 ++++++ .../Telemetry/MetricSinks.cs | 31 ++++++ .../Telemetry/MetricsService.cs | 63 +++++++++--- .../MetricsServiceConfigurationModule.cs | 8 ++ .../Functional/Telemetry/TestMetricsMessage.cs | 6 +- 11 files changed, 399 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 8feb0e1..4bdf723 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -247,12 +247,18 @@ under the License. <Compile Include="Tasks\TaskConfigurationOptions.cs" /> <Compile Include="Tasks\TaskMessage.cs" /> <Compile Include="Telemetry\Counter.cs" /> + <Compile Include="Telemetry\CounterData.cs" /> + <Compile Include="Telemetry\CountersData.cs" /> <Compile Include="Telemetry\Counters.cs" /> + <Compile Include="Telemetry\CounterSinkThreshold.cs" /> + <Compile Include="Telemetry\DefaultMetricsSink.cs" /> <Compile Include="Telemetry\EvaluatorMetrics.cs" /> <Compile Include="Telemetry\ICounter.cs" /> <Compile Include="Telemetry\ICounters.cs" /> <Compile Include="Telemetry\IEvaluatorMetrics.cs" /> + <Compile Include="Telemetry\IMetricsSink.cs" /> <Compile Include="Telemetry\MessageSenderConfigurationModule.cs" /> + <Compile Include="Telemetry\MetricSinks.cs" /> <Compile Include="Telemetry\MetricsMessageSender.cs" /> <Compile Include="Telemetry\MetricsService.cs" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs index 4a8c546..d126a7e 100644 --- a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs @@ -62,6 +62,12 @@ using System.Runtime.InteropServices; "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] +[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] + // Allow NSubstitute to create proxy implementations [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=002400000480000" + "0940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a36" + http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs new file mode 100644 index 0000000..5f23262 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; + +namespace Org.Apache.REEF.Common.Telemetry +{ + /// <summary> + /// This class wraps a Counter object and the increment value since last sink + /// </summary> + internal sealed class CounterData + { + /// <summary> + /// Counter object + /// </summary> + private ICounter _counter; + + /// <summary> + /// Counter increment value since last sink + /// </summary> + internal int IncrementSinceLastSink { get; private set; } + + /// <summary> + /// Constructor for CounterData + /// </summary> + /// <param name="counter"></param> + /// <param name="initialValue"></param> + internal CounterData(ICounter counter, int initialValue) + { + _counter = counter; + IncrementSinceLastSink = initialValue; + } + + /// <summary> + /// clear the increment since last sink + /// </summary> + internal void ResetSinceLastSink() + { + IncrementSinceLastSink = 0; + } + + internal void UpdateCounter(ICounter counter) + { + IncrementSinceLastSink += counter.Value - _counter.Value; + + //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter: + //// if evaluator contains the aggregated values, the value will override existing value + //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent + //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here + //// We also need to consider failure cases. + _counter = counter; + } + + /// <summary> + /// Get count name and value as KeyValuePair + /// </summary> + /// <returns></returns> + internal KeyValuePair<string, string> GetKeyValuePair() + { + return new KeyValuePair<string, string>(_counter.Name, _counter.Value.ToString()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs new file mode 100644 index 0000000..0f458c0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Telemetry +{ + [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")] + public class CounterSinkThreshold : Name<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs new file mode 100644 index 0000000..55393b0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Telemetry +{ + /// <summary> + /// This class maintains a collection of the data for all the counters for metrics service. + /// When new counter data is received, the data in the collection will be updated. + /// After the data is processed, the increment since last process will be reset. + /// </summary> + internal sealed class CountersData + { + private static readonly Logger Logger = Logger.GetLogger(typeof(CountersData)); + + /// <summary> + /// Registration of counters + /// </summary> + private readonly IDictionary<string, CounterData> _counterMap = new ConcurrentDictionary<string, CounterData>(); + + [Inject] + private CountersData() + { + } + + /// <summary> + /// Update counters + /// </summary> + /// <param name="counters"></param> + internal void Update(ICounters counters) + { + foreach (var counter in counters.GetCounters()) + { + CounterData counterData; + if (_counterMap.TryGetValue(counter.Name, out counterData)) + { + counterData.UpdateCounter(counter); + } + else + { + _counterMap.Add(counter.Name, new CounterData(counter, counter.Value)); + } + + Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}, incrementSinceLastSink: {4}.", + counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp), _counterMap[counter.Name].IncrementSinceLastSink); + } + } + + /// <summary> + /// Reset increment since last sink for each counter + /// </summary> + internal void Reset() + { + foreach (var c in _counterMap.Values) + { + c.ResetSinceLastSink(); + } + } + + /// <summary> + /// Convert the counter data into ISet for sink + /// </summary> + /// <returns></returns> + internal ISet<KeyValuePair<string, string>> GetCounterData() + { + var set = new HashSet<KeyValuePair<string, string>>(); + foreach (var c in _counterMap) + { + set.Add(c.Value.GetKeyValuePair()); + } + return set; + } + + /// <summary> + /// The condition that triggers the sink. The condition can be modified later. + /// </summary> + /// <returns></returns> + internal bool TriggerSink(int counterSinkThreshold) + { + return _counterMap.Values.Sum(e => e.IncrementSinceLastSink) > counterSinkThreshold; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs new file mode 100644 index 0000000..d302812 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Telemetry +{ + /// <summary> + /// This default IMetricsSink is just an example of IMetricsSink + /// Here the data is logged in Sink() method + /// It is more useful in test + /// </summary> + internal sealed class DefaultMetricsSink : IMetricsSink + { + private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultMetricsSink)); + + [Inject] + private DefaultMetricsSink() + { + } + + /// <summary> + /// Simple sink for metrics data + /// </summary> + /// <param name="metrics"></param> + public void Sink(ISet<KeyValuePair<string, string>> metrics) + { + foreach (var m in metrics) + { + Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value); + } + } + + /// <summary> + /// This is intentionally empty as we don't have any resource to release in the implementation. + /// </summary> + public void Dispose() + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs new file mode 100644 index 0000000..b27bd3d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Telemetry +{ + /// <summary> + /// Interface for metrics sink. + /// It is used to output IMRU metrics. + /// </summary> + [DefaultImplementation(typeof(DefaultMetricsSink))] + public interface IMetricsSink : IDisposable + { + void Sink(ISet<KeyValuePair<string, string>> metrics); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs new file mode 100644 index 0000000..09b7598 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Telemetry +{ + /// <summary> + /// A named parameter for a set of IMetricsSink. + /// </summary> + [NamedParameter(DefaultClasses = new Type[] { typeof(DefaultMetricsSink) })] + public sealed class MetricSinks : Name<ISet<IMetricsSink>> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs index 7ff3c26..75c8cc2 100644 --- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs +++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs @@ -16,9 +16,9 @@ // under the License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; @@ -34,14 +34,35 @@ namespace Org.Apache.REEF.Common.Telemetry internal sealed class MetricsService : IObserver<IContextMessage> { private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService)); - private readonly IDictionary<string, ICounter> _counters = new ConcurrentDictionary<string, ICounter>(); + + /// <summary> + /// Contains Counters received in the Metrics service + /// </summary> + private readonly CountersData _countersData; + + /// <summary> + /// A set of metrics sinks + /// </summary> + private readonly ISet<IMetricsSink> _metricsSinks; + + /// <summary> + /// The threshold that triggers the sinks. + /// Currently only one threshold is defined for all the counters. Later, it can be extended to define a threshold per counter. + /// </summary> + private readonly int _counterSinkThreshold; /// <summary> /// It can be bound with driver configuration as a context message handler /// </summary> [Inject] - private MetricsService() + private MetricsService( + [Parameter(typeof(MetricSinks))] ISet<IMetricsSink> metricsSinks, + [Parameter(typeof(CounterSinkThreshold))] int counterSinkThreshold, + CountersData countersData) { + _metricsSinks = metricsSinks; + _counterSinkThreshold = counterSinkThreshold; + _countersData = countersData; } /// <summary> @@ -54,24 +75,34 @@ namespace Org.Apache.REEF.Common.Telemetry var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters(); Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived); - foreach (var counter in counters.GetCounters()) + _countersData.Update(counters); + + if (_countersData.TriggerSink(_counterSinkThreshold)) { - ICounter c; - if (_counters.TryGetValue(counter.Name, out c)) + Sink(_countersData.GetCounterData()); + _countersData.Reset(); + } + } + + /// <summary> + /// Call each Sink to sink the data in the counters + /// </summary> + private void Sink(ISet<KeyValuePair<string, string>> set) + { + foreach (var s in _metricsSinks) + { + try { - //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter: - //// if evaluator contains the aggregated values, the value will override existing value - //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent - //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here - //// We also need to consider failure cases. - _counters[counter.Name] = counter; + Task.Run(() => s.Sink(set)); } - else + catch (Exception e) { - _counters.Add(counter.Name, counter); + Logger.Log(Level.Error, "Exception happens during the sink for Sink {0} with Exception: {1}.", s.GetType().AssemblyQualifiedName, e); + } + finally + { + s.Dispose(); } - - Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}.", counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs index b7e75d5..957d505 100644 --- a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs +++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs @@ -24,8 +24,14 @@ using Org.Apache.REEF.Tang.Util; namespace Org.Apache.REEF.Driver { + /// <summary> + /// Configuration module for MetricsService. + /// </summary> public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder { + public static readonly OptionalImpl<IMetricsSink> OnMetricsSink = new OptionalImpl<IMetricsSink>(); + public static readonly OptionalParameter<int> CounterSinkThreshold = new OptionalParameter<int>(); + /// <summary> /// It provides the configuration for MetricsService /// </summary> @@ -33,6 +39,8 @@ namespace Org.Apache.REEF.Driver .BindSetEntry<DriverBridgeConfigurationOptions.ContextMessageHandlers, MetricsService, IObserver<IContextMessage>>( GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class, GenericType<MetricsService>.Class) + .BindSetEntry(GenericType<MetricSinks>.Class, OnMetricsSink) + .BindNamedParameter(GenericType<CounterSinkThreshold>.Class, CounterSinkThreshold) .Build(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs index 12eb9f9..f447f75 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Common.Telemetry; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Interface; @@ -51,7 +52,10 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) .Build(); - var c2 = MetricsServiceConfigurationModule.ConfigurationModule.Build(); + var c2 = MetricsServiceConfigurationModule.ConfigurationModule + .Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class) + .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5") + .Build(); return Configurations.Merge(c1, c2); }
