Repository: flink
Updated Branches:
  refs/heads/master 307eae6e2 -> 1db14fc06


[FLINK-3950] Implement MeterView

This closes #2443.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db14fc0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db14fc0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db14fc0

Branch: refs/heads/master
Commit: 1db14fc06198e5519c7e71c418f5277bdc2104fc
Parents: 307eae6
Author: zentol <[email protected]>
Authored: Wed Aug 31 15:47:17 2016 +0200
Committer: zentol <[email protected]>
Committed: Wed Oct 26 12:16:49 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/metrics/MeterView.java     | 90 +++++++++++++++++++
 .../java/org/apache/flink/metrics/View.java     | 31 +++++++
 .../org/apache/flink/metrics/MeterViewTest.java | 93 +++++++++++++++++++
 .../flink/runtime/metrics/MetricRegistry.java   | 21 +++--
 .../flink/runtime/metrics/ViewUpdater.java      | 95 ++++++++++++++++++++
 5 files changed, 325 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
new file mode 100644
index 0000000..40dd39f
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A MeterView provides an average rate of events per second over a given time 
period.
+ * 
+ * The primary advantage of this class is that the rate is neither updated by 
the computing thread nor for every event.
+ * Instead, a history of counts is maintained that is updated in regular 
intervals by a background thread. From this
+ * history a rate is derived on demand, which represents the average rate of 
events over the given time span. If the
+ * rate is never requested there is thus no overhead for the computation of 
the rate.
+ * 
+ * Setting the time span to a low value reduces memory-consumption and will 
more accurately report short-term changes.
+ * The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}.
+ * A high value in turn increases memory-consumption since a longer history 
has to be maintained.
+ * 
+ * The events are counted by a {@link Counter}.
+ */
+public class MeterView implements Meter, View {
+       /** The underlying counter maintaining the count */
+       private final Counter counter;
+       /** The time-span over which the average is calculated */
+       private final int timeSpanInSeconds;
+       /** Circular array containing the history of values */
+       private final long[] values;
+       /** The index in the array for the current time */
+       private int time = 0;
+
+       /** Signals whether a rate was already calculated for the current 
time-frame */
+       private boolean updateRate = false;
+       /** The last rate we computed */
+       private double currentRate = 0;
+
+       public MeterView(int timeSpanInSeconds) {
+               this(new SimpleCounter(), timeSpanInSeconds);
+       }
+
+       public MeterView(Counter counter, int timeSpanInSeconds) {
+               this.counter = counter;
+               this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds 
% UPDATE_INTERVAL_SECONDS);
+               this.values = new long[this.timeSpanInSeconds / 
UPDATE_INTERVAL_SECONDS + 1];
+       }
+
+       @Override
+       public void markEvent() {
+               this.counter.inc();
+       }
+
+       @Override
+       public void markEvent(long n) {
+               this.counter.inc(n);
+       }
+
+       @Override
+       public long getCount() {
+               return counter.getCount();
+       }
+
+       @Override
+       public double getRate() {
+               if (updateRate) {
+                       final int time = this.time;
+                       currentRate =  ((double) (values[time] - values[(time + 
1) % values.length]) / timeSpanInSeconds);
+                       updateRate = false;
+               }
+               return currentRate;
+       }
+
+       @Override
+       public void update() {
+               time = (time + 1) % values.length;
+               values[time] = counter.getCount();
+               updateRate = true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
new file mode 100644
index 0000000..1780130
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * An interface for metrics which should be updated in regular intervals by a 
background thread.
+ */
+public interface View {
+       /** The interval in which metrics are updated */
+       int UPDATE_INTERVAL_SECONDS = 5;
+
+       /**
+        * This method will be called regularly to update the metric.
+        */
+       void update();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
new file mode 100644
index 0000000..8ba298f
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MeterViewTest {
+       @Test
+       public void testGetCount() {
+               Counter c = new SimpleCounter();
+               c.inc(5);
+               Meter m = new MeterView(c, 60);
+
+               assertEquals(5, m.getCount());
+       }
+
+       @Test
+       public void testMarkEvent() {
+               Counter c = new SimpleCounter();
+               Meter m = new MeterView(c, 60);
+
+               assertEquals(0, m.getCount());
+               m.markEvent();
+               assertEquals(1, m.getCount());
+               m.markEvent(2);
+               assertEquals(3, m.getCount());
+       }
+
+       @Test
+       public void testGetRate() {
+               Counter c = new SimpleCounter();
+               MeterView m = new MeterView(c, 60);
+
+               // values = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+               for (int x = 0; x < 12; x++) {
+                       m.markEvent(10);
+                       m.update();
+               }
+               // values = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 
120]
+               assertEquals(2.0, m.getRate(), 0.1); // 120 - 0 / 60
+
+               for (int x = 0; x < 12; x++) {
+                       m.markEvent(10);
+                       m.update();
+               }
+               // values = [130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 
230, 240, 120]
+               assertEquals(2.0, m.getRate(), 0.1); // 240 - 120 / 60
+
+               for (int x = 0; x < 6; x++) {
+                       m.markEvent(20);
+                       m.update();
+               }
+               // values = [280, 300, 320, 340, 360, 180, 190, 200, 210, 220, 
230, 240, 260]
+               assertEquals(3.0, m.getRate(), 0.1); // 360 - 180 / 60
+
+               for (int x = 0; x < 6; x++) {
+                       m.markEvent(20);
+                       m.update();
+               }
+               // values = [280, 300, 320, 340, 360, 380, 400, 420, 440, 460, 
480, 240, 260]
+               assertEquals(4.0, m.getRate(), 0.1); // 480 - 240 / 60
+
+               for (int x = 0; x < 6; x++) {
+                       m.update();
+               }
+               // values = [480, 480, 480, 480, 360, 380, 400, 420, 440, 460, 
480, 480, 480]
+               assertEquals(2.0, m.getRate(), 0.1); // 480 - 360 / 60
+
+               for (int x = 0; x < 6; x++) {
+                       m.update();
+               }
+               // values = [480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 
480, 480, 480]
+               assertEquals(0.0, m.getRate(), 0.1); // 480 - 480 / 60
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 219927d..b514fd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -54,6 +55,8 @@ public class MetricRegistry {
        private ScheduledExecutorService executor;
        private ActorRef queryService;
 
+       private ViewUpdater viewUpdater;
+
        private final ScopeFormats scopeFormats;
        private final char globalDelimiter;
        private final List<Character> delimiters = new ArrayList<>();
@@ -70,11 +73,12 @@ public class MetricRegistry {
 
                List<Tuple2<String, Configuration>> reporterConfigurations = 
config.getReporterConfigurations();
 
+               this.executor = Executors.newSingleThreadScheduledExecutor();
+
                if (reporterConfigurations.isEmpty()) {
                        // no reporters defined
                        // by default, don't report anything
                        LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
-                       this.executor = null;
                } else {
                        // we have some reporters so
                        for (Tuple2<String, Configuration> 
reporterConfiguration: reporterConfigurations) {
@@ -113,9 +117,6 @@ public class MetricRegistry {
                                        reporterInstance.open(metricConfig);
 
                                        if (reporterInstance instanceof 
Scheduled) {
-                                               if (executor == null) {
-                                                       executor = 
Executors.newSingleThreadScheduledExecutor();
-                                               }
                                                LOG.info("Periodically 
reporting metrics in intervals of {} {} for reporter {} of type {}.", period, 
timeunit.name(), namedReporter, className);
 
                                                executor.scheduleWithFixedDelay(
@@ -133,7 +134,6 @@ public class MetricRegistry {
                                        
this.delimiters.add(delimiterForReporter.charAt(0));
                                }
                                catch (Throwable t) {
-                                       shutdownExecutor();
                                        LOG.error("Could not instantiate 
metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
                                }
                        }
@@ -242,6 +242,12 @@ public class MetricRegistry {
                        if (queryService != null) {
                                
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
                        }
+                       if (metric instanceof View) {
+                               if (viewUpdater == null) {
+                                       viewUpdater = new ViewUpdater(executor);
+                               }
+                               viewUpdater.notifyOfAddedView((View) metric);
+                       }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
                }
@@ -268,6 +274,11 @@ public class MetricRegistry {
                        if (queryService != null) {
                                
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
                        }
+                       if (metric instanceof View) {
+                               if (viewUpdater != null) {
+                                       viewUpdater.notifyOfRemovedView((View) 
metric);
+                               }
+                       }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
new file mode 100644
index 0000000..e4d0596
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.View;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.metrics.View.UPDATE_INTERVAL_SECONDS;
+
+/**
+ * The ViewUpdater is responsible for updating all metrics that implement the 
{@link View} interface.
+ */
+public class ViewUpdater {
+       private final Set<View> toAdd = new HashSet<>();
+       private final Set<View> toRemove = new HashSet<>();
+
+       private final Object lock = new Object();
+
+       public ViewUpdater(ScheduledExecutorService executor) {
+               executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, 
toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+       }
+
+       /**
+        * Notifies this ViewUpdater of a new metric that should be regularly 
updated.
+        *
+        * @param view metric that should be regularly updated
+        */
+       public void notifyOfAddedView(View view) {
+               synchronized (lock) {
+                       toAdd.add(view);
+               }
+       }
+
+       /**
+        * Notifies this ViewUpdater of a metric that should no longer be 
regularly updated.
+        *
+        * @param view metric that should no longer be regularly updated
+        */
+       public void notifyOfRemovedView(View view) {
+               synchronized (lock) {
+                       toRemove.add(view);
+               }
+       }
+
+       /**
+        * The TimerTask doing the actual updating.
+        */
+       private static class ViewUpdaterTask extends TimerTask {
+               private final Object lock;
+               private final Set<View> views;
+               private final Set<View> toAdd;
+               private final Set<View> toRemove;
+
+               private ViewUpdaterTask(Object lock, Set<View> toAdd, Set<View> 
toRemove) {
+                       this.lock = lock;
+                       this.views = new HashSet<>();
+                       this.toAdd = toAdd;
+                       this.toRemove = toRemove;
+               }
+
+               @Override
+               public void run() {
+                       for (View toUpdate : this.views) {
+                               toUpdate.update();
+                       }
+
+                       synchronized (lock) {
+                               views.addAll(toAdd);
+                               toAdd.clear();
+                               views.removeAll(toRemove);
+                               toRemove.clear();
+                       }
+               }
+       }
+}

Reply via email to