Repository: flink Updated Branches: refs/heads/master 307eae6e2 -> 1db14fc06
[FLINK-3950] Implement MeterView This closes #2443. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db14fc0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db14fc0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db14fc0 Branch: refs/heads/master Commit: 1db14fc06198e5519c7e71c418f5277bdc2104fc Parents: 307eae6 Author: zentol <[email protected]> Authored: Wed Aug 31 15:47:17 2016 +0200 Committer: zentol <[email protected]> Committed: Wed Oct 26 12:16:49 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/metrics/MeterView.java | 90 +++++++++++++++++++ .../java/org/apache/flink/metrics/View.java | 31 +++++++ .../org/apache/flink/metrics/MeterViewTest.java | 93 +++++++++++++++++++ .../flink/runtime/metrics/MetricRegistry.java | 21 +++-- .../flink/runtime/metrics/ViewUpdater.java | 95 ++++++++++++++++++++ 5 files changed, 325 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java new file mode 100644 index 0000000..40dd39f --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A MeterView provides an average rate of events per second over a given time period. + * + * The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event. + * Instead, a history of counts is maintained that is updated in regular intervals by a background thread. From this + * history a rate is derived on demand, which represents the average rate of events over the given time span. If the + * rate is never requested there is thus no overhead for the computation of the rate. + * + * Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes. + * The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}. + * A high value in turn increases memory-consumption since a longer history has to be maintained. + * + * The events are counted by a {@link Counter}. + */ +public class MeterView implements Meter, View { + /** The underlying counter maintaining the count */ + private final Counter counter; + /** The time-span over which the average is calculated */ + private final int timeSpanInSeconds; + /** Circular array containing the history of values */ + private final long[] values; + /** The index in the array for the current time */ + private int time = 0; + + /** Signals whether a rate was already calculated for the current time-frame */ + private boolean updateRate = false; + /** The last rate we computed */ + private double currentRate = 0; + + public MeterView(int timeSpanInSeconds) { + this(new SimpleCounter(), timeSpanInSeconds); + } + + public MeterView(Counter counter, int timeSpanInSeconds) { + this.counter = counter; + this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS); + this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1]; + } + + @Override + public void markEvent() { + this.counter.inc(); + } + + @Override + public void markEvent(long n) { + this.counter.inc(n); + } + + @Override + public long getCount() { + return counter.getCount(); + } + + @Override + public double getRate() { + if (updateRate) { + final int time = this.time; + currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds); + updateRate = false; + } + return currentRate; + } + + @Override + public void update() { + time = (time + 1) % values.length; + values[time] = counter.getCount(); + updateRate = true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java new file mode 100644 index 0000000..1780130 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * An interface for metrics which should be updated in regular intervals by a background thread. + */ +public interface View { + /** The interval in which metrics are updated */ + int UPDATE_INTERVAL_SECONDS = 5; + + /** + * This method will be called regularly to update the metric. + */ + void update(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java new file mode 100644 index 0000000..8ba298f --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MeterViewTest { + @Test + public void testGetCount() { + Counter c = new SimpleCounter(); + c.inc(5); + Meter m = new MeterView(c, 60); + + assertEquals(5, m.getCount()); + } + + @Test + public void testMarkEvent() { + Counter c = new SimpleCounter(); + Meter m = new MeterView(c, 60); + + assertEquals(0, m.getCount()); + m.markEvent(); + assertEquals(1, m.getCount()); + m.markEvent(2); + assertEquals(3, m.getCount()); + } + + @Test + public void testGetRate() { + Counter c = new SimpleCounter(); + MeterView m = new MeterView(c, 60); + + // values = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + for (int x = 0; x < 12; x++) { + m.markEvent(10); + m.update(); + } + // values = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120] + assertEquals(2.0, m.getRate(), 0.1); // 120 - 0 / 60 + + for (int x = 0; x < 12; x++) { + m.markEvent(10); + m.update(); + } + // values = [130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 120] + assertEquals(2.0, m.getRate(), 0.1); // 240 - 120 / 60 + + for (int x = 0; x < 6; x++) { + m.markEvent(20); + m.update(); + } + // values = [280, 300, 320, 340, 360, 180, 190, 200, 210, 220, 230, 240, 260] + assertEquals(3.0, m.getRate(), 0.1); // 360 - 180 / 60 + + for (int x = 0; x < 6; x++) { + m.markEvent(20); + m.update(); + } + // values = [280, 300, 320, 340, 360, 380, 400, 420, 440, 460, 480, 240, 260] + assertEquals(4.0, m.getRate(), 0.1); // 480 - 240 / 60 + + for (int x = 0; x < 6; x++) { + m.update(); + } + // values = [480, 480, 480, 480, 360, 380, 400, 420, 440, 460, 480, 480, 480] + assertEquals(2.0, m.getRate(), 0.1); // 480 - 360 / 60 + + for (int x = 0; x < 6; x++) { + m.update(); + } + // values = [480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480] + assertEquals(0.0, m.getRate(), 0.1); // 480 - 480 / 60 + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 219927d..b514fd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -54,6 +55,8 @@ public class MetricRegistry { private ScheduledExecutorService executor; private ActorRef queryService; + private ViewUpdater viewUpdater; + private final ScopeFormats scopeFormats; private final char globalDelimiter; private final List<Character> delimiters = new ArrayList<>(); @@ -70,11 +73,12 @@ public class MetricRegistry { List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations(); + this.executor = Executors.newSingleThreadScheduledExecutor(); + if (reporterConfigurations.isEmpty()) { // no reporters defined // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); - this.executor = null; } else { // we have some reporters so for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) { @@ -113,9 +117,6 @@ public class MetricRegistry { reporterInstance.open(metricConfig); if (reporterInstance instanceof Scheduled) { - if (executor == null) { - executor = Executors.newSingleThreadScheduledExecutor(); - } LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className); executor.scheduleWithFixedDelay( @@ -133,7 +134,6 @@ public class MetricRegistry { this.delimiters.add(delimiterForReporter.charAt(0)); } catch (Throwable t) { - shutdownExecutor(); LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t); } } @@ -242,6 +242,12 @@ public class MetricRegistry { if (queryService != null) { MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); } + if (metric instanceof View) { + if (viewUpdater == null) { + viewUpdater = new ViewUpdater(executor); + } + viewUpdater.notifyOfAddedView((View) metric); + } } catch (Exception e) { LOG.error("Error while registering metric.", e); } @@ -268,6 +274,11 @@ public class MetricRegistry { if (queryService != null) { MetricQueryService.notifyOfRemovedMetric(queryService, metric); } + if (metric instanceof View) { + if (viewUpdater != null) { + viewUpdater.notifyOfRemovedView((View) metric); + } + } } catch (Exception e) { LOG.error("Error while registering metric.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java new file mode 100644 index 0000000..e4d0596 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics; + +import org.apache.flink.metrics.View; + +import java.util.HashSet; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.metrics.View.UPDATE_INTERVAL_SECONDS; + +/** + * The ViewUpdater is responsible for updating all metrics that implement the {@link View} interface. + */ +public class ViewUpdater { + private final Set<View> toAdd = new HashSet<>(); + private final Set<View> toRemove = new HashSet<>(); + + private final Object lock = new Object(); + + public ViewUpdater(ScheduledExecutorService executor) { + executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + /** + * Notifies this ViewUpdater of a new metric that should be regularly updated. + * + * @param view metric that should be regularly updated + */ + public void notifyOfAddedView(View view) { + synchronized (lock) { + toAdd.add(view); + } + } + + /** + * Notifies this ViewUpdater of a metric that should no longer be regularly updated. + * + * @param view metric that should no longer be regularly updated + */ + public void notifyOfRemovedView(View view) { + synchronized (lock) { + toRemove.add(view); + } + } + + /** + * The TimerTask doing the actual updating. + */ + private static class ViewUpdaterTask extends TimerTask { + private final Object lock; + private final Set<View> views; + private final Set<View> toAdd; + private final Set<View> toRemove; + + private ViewUpdaterTask(Object lock, Set<View> toAdd, Set<View> toRemove) { + this.lock = lock; + this.views = new HashSet<>(); + this.toAdd = toAdd; + this.toRemove = toRemove; + } + + @Override + public void run() { + for (View toUpdate : this.views) { + toUpdate.update(); + } + + synchronized (lock) { + views.addAll(toAdd); + toAdd.clear(); + views.removeAll(toRemove); + toRemove.clear(); + } + } + } +}
